795 lines
30 KiB
Python
795 lines
30 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
import sqlalchemy
|
|
import warnings
|
|
|
|
from sqlalchemy import *
|
|
|
|
from migrate import changeset, exceptions
|
|
from migrate.changeset import *
|
|
from migrate.changeset.schema import ColumnDelta
|
|
from migrate.tests import fixture
|
|
from migrate.tests.fixture.warnings import catch_warnings
|
|
|
|
class TestAddDropColumn(fixture.DB):
|
|
"""Test add/drop column through all possible interfaces
|
|
also test for constraints
|
|
"""
|
|
level = fixture.DB.CONNECT
|
|
table_name = 'tmp_adddropcol'
|
|
table_int = 0
|
|
|
|
def _setup(self, url):
|
|
super(TestAddDropColumn, self)._setup(url)
|
|
self.meta = MetaData()
|
|
self.table = Table(self.table_name, self.meta,
|
|
Column('id', Integer, unique=True),
|
|
)
|
|
self.meta.bind = self.engine
|
|
if self.engine.has_table(self.table.name):
|
|
self.table.drop()
|
|
self.table.create()
|
|
|
|
def _teardown(self):
|
|
if self.engine.has_table(self.table.name):
|
|
self.table.drop()
|
|
self.meta.clear()
|
|
super(TestAddDropColumn,self)._teardown()
|
|
|
|
def run_(self, create_column_func, drop_column_func, *col_p, **col_k):
|
|
col_name = 'data'
|
|
|
|
def assert_numcols(num_of_expected_cols):
|
|
# number of cols should be correct in table object and in database
|
|
self.refresh_table(self.table_name)
|
|
result = len(self.table.c)
|
|
|
|
self.assertEquals(result, num_of_expected_cols),
|
|
if col_k.get('primary_key', None):
|
|
# new primary key: check its length too
|
|
result = len(self.table.primary_key)
|
|
self.assertEquals(result, num_of_expected_cols)
|
|
|
|
# we have 1 columns and there is no data column
|
|
assert_numcols(1)
|
|
self.assertTrue(getattr(self.table.c, 'data', None) is None)
|
|
if len(col_p) == 0:
|
|
col_p = [String(40)]
|
|
col = Column(col_name, *col_p, **col_k)
|
|
create_column_func(col)
|
|
assert_numcols(2)
|
|
# data column exists
|
|
self.assert_(self.table.c.data.type.length, 40)
|
|
|
|
col2 = self.table.c.data
|
|
drop_column_func(col2)
|
|
assert_numcols(1)
|
|
|
|
@fixture.usedb()
|
|
def test_undefined(self):
|
|
"""Add/drop columns not yet defined in the table"""
|
|
def add_func(col):
|
|
return create_column(col, self.table)
|
|
def drop_func(col):
|
|
return drop_column(col, self.table)
|
|
return self.run_(add_func, drop_func)
|
|
|
|
@fixture.usedb()
|
|
def test_defined(self):
|
|
"""Add/drop columns already defined in the table"""
|
|
def add_func(col):
|
|
self.meta.clear()
|
|
self.table = Table(self.table_name, self.meta,
|
|
Column('id', Integer, primary_key=True),
|
|
col,
|
|
)
|
|
return create_column(col)
|
|
def drop_func(col):
|
|
return drop_column(col)
|
|
return self.run_(add_func, drop_func)
|
|
|
|
@fixture.usedb()
|
|
def test_method_bound(self):
|
|
"""Add/drop columns via column methods; columns bound to a table
|
|
ie. no table parameter passed to function
|
|
"""
|
|
def add_func(col):
|
|
self.assert_(col.table is None, col.table)
|
|
self.table.append_column(col)
|
|
return col.create()
|
|
def drop_func(col):
|
|
#self.assert_(col.table is None,col.table)
|
|
#self.table.append_column(col)
|
|
return col.drop()
|
|
return self.run_(add_func, drop_func)
|
|
|
|
@fixture.usedb()
|
|
def test_method_notbound(self):
|
|
"""Add/drop columns via column methods; columns not bound to a table"""
|
|
def add_func(col):
|
|
return col.create(self.table)
|
|
def drop_func(col):
|
|
return col.drop(self.table)
|
|
return self.run_(add_func, drop_func)
|
|
|
|
@fixture.usedb()
|
|
def test_tablemethod_obj(self):
|
|
"""Add/drop columns via table methods; by column object"""
|
|
def add_func(col):
|
|
return self.table.create_column(col)
|
|
def drop_func(col):
|
|
return self.table.drop_column(col)
|
|
return self.run_(add_func, drop_func)
|
|
|
|
@fixture.usedb()
|
|
def test_tablemethod_name(self):
|
|
"""Add/drop columns via table methods; by column name"""
|
|
def add_func(col):
|
|
# must be bound to table
|
|
self.table.append_column(col)
|
|
return self.table.create_column(col.name)
|
|
def drop_func(col):
|
|
# Not necessarily bound to table
|
|
return self.table.drop_column(col.name)
|
|
return self.run_(add_func, drop_func)
|
|
|
|
@fixture.usedb()
|
|
def test_byname(self):
|
|
"""Add/drop columns via functions; by table object and column name"""
|
|
def add_func(col):
|
|
self.table.append_column(col)
|
|
return create_column(col.name, self.table)
|
|
def drop_func(col):
|
|
return drop_column(col.name, self.table)
|
|
return self.run_(add_func, drop_func)
|
|
|
|
@fixture.usedb()
|
|
def test_drop_column_not_in_table(self):
|
|
"""Drop column by name"""
|
|
def add_func(col):
|
|
return self.table.create_column(col)
|
|
def drop_func(col):
|
|
self.table.c.remove(col)
|
|
return self.table.drop_column(col.name)
|
|
self.run_(add_func, drop_func)
|
|
|
|
@fixture.usedb()
|
|
def test_fk(self):
|
|
"""Can create columns with foreign keys"""
|
|
# create FK's target
|
|
reftable = Table('tmp_ref', self.meta,
|
|
Column('id', Integer, primary_key=True),
|
|
)
|
|
if self.engine.has_table(reftable.name):
|
|
reftable.drop()
|
|
reftable.create()
|
|
|
|
# create column with fk
|
|
col = Column('data', Integer, ForeignKey(reftable.c.id))
|
|
if self.url.startswith('sqlite'):
|
|
self.assertRaises(exceptions.NotSupportedError,
|
|
col.create, self.table)
|
|
else:
|
|
col.create(self.table)
|
|
|
|
# check if constraint is added
|
|
for cons in self.table.constraints:
|
|
if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint):
|
|
break
|
|
else:
|
|
self.fail('No constraint found')
|
|
|
|
# TODO: test on db level if constraints work
|
|
|
|
self.assertEqual(reftable.c.id.name, col.foreign_keys[0].column.name)
|
|
col.drop(self.table)
|
|
|
|
if self.engine.has_table(reftable.name):
|
|
reftable.drop()
|
|
|
|
@fixture.usedb(not_supported='sqlite')
|
|
def test_pk(self):
|
|
"""Can create columns with primary key"""
|
|
col = Column('data', Integer, nullable=False)
|
|
self.assertRaises(exceptions.InvalidConstraintError,
|
|
col.create, self.table, primary_key_name=True)
|
|
col.create(self.table, primary_key_name='data_pkey')
|
|
|
|
# check if constraint was added (cannot test on objects)
|
|
self.table.insert(values={'data': 4}).execute()
|
|
try:
|
|
self.table.insert(values={'data': 4}).execute()
|
|
except (sqlalchemy.exc.IntegrityError,
|
|
sqlalchemy.exc.ProgrammingError):
|
|
pass
|
|
else:
|
|
self.fail()
|
|
|
|
col.drop()
|
|
|
|
@fixture.usedb(not_supported='mysql')
|
|
def test_check(self):
|
|
"""Can create columns with check constraint"""
|
|
col = Column('data',
|
|
Integer,
|
|
sqlalchemy.schema.CheckConstraint('data > 4'))
|
|
col.create(self.table)
|
|
|
|
# check if constraint was added (cannot test on objects)
|
|
self.table.insert(values={'data': 5}).execute()
|
|
try:
|
|
self.table.insert(values={'data': 3}).execute()
|
|
except (sqlalchemy.exc.IntegrityError,
|
|
sqlalchemy.exc.ProgrammingError):
|
|
pass
|
|
else:
|
|
self.fail()
|
|
|
|
col.drop()
|
|
|
|
@fixture.usedb(not_supported='sqlite')
|
|
def test_unique(self):
|
|
"""Can create columns with unique constraint"""
|
|
self.assertRaises(exceptions.InvalidConstraintError,
|
|
Column('data', Integer, unique=True).create, self.table)
|
|
col = Column('data', Integer)
|
|
col.create(self.table, unique_name='data_unique')
|
|
|
|
# check if constraint was added (cannot test on objects)
|
|
self.table.insert(values={'data': 5}).execute()
|
|
try:
|
|
self.table.insert(values={'data': 5}).execute()
|
|
except (sqlalchemy.exc.IntegrityError,
|
|
sqlalchemy.exc.ProgrammingError):
|
|
pass
|
|
else:
|
|
self.fail()
|
|
|
|
col.drop(self.table)
|
|
|
|
# TODO: remove already attached columns with indexes, uniques, pks, fks ..
|
|
@fixture.usedb()
|
|
def test_index(self):
|
|
"""Can create columns with indexes"""
|
|
self.assertRaises(exceptions.InvalidConstraintError,
|
|
Column('data', Integer).create, self.table, index_name=True)
|
|
col = Column('data', Integer)
|
|
col.create(self.table, index_name='ix_data')
|
|
|
|
# check if index was added
|
|
self.table.insert(values={'data': 5}).execute()
|
|
try:
|
|
self.table.insert(values={'data': 5}).execute()
|
|
except (sqlalchemy.exc.IntegrityError,
|
|
sqlalchemy.exc.ProgrammingError):
|
|
pass
|
|
else:
|
|
self.fail()
|
|
|
|
Index('ix_data', col).drop(bind=self.engine)
|
|
col.drop()
|
|
|
|
@fixture.usedb()
|
|
def test_server_defaults(self):
|
|
"""Can create columns with server_default values"""
|
|
col = Column('data', String(244), server_default='foobar')
|
|
col.create(self.table)
|
|
|
|
self.table.insert(values={'id': 10}).execute()
|
|
row = self._select_row()
|
|
self.assertEqual(u'foobar', row['data'])
|
|
|
|
col.drop()
|
|
|
|
@fixture.usedb()
|
|
def test_populate_default(self):
|
|
"""Test populate_default=True"""
|
|
def default():
|
|
return 'foobar'
|
|
col = Column('data', String(244), default=default)
|
|
col.create(self.table, populate_default=True)
|
|
|
|
self.table.insert(values={'id': 10}).execute()
|
|
row = self._select_row()
|
|
self.assertEqual(u'foobar', row['data'])
|
|
|
|
col.drop()
|
|
|
|
# TODO: test sequence
|
|
# TODO: test quoting
|
|
# TODO: test non-autoname constraints
|
|
|
|
|
|
class TestRename(fixture.DB):
|
|
"""Tests for table and index rename methods"""
|
|
level = fixture.DB.CONNECT
|
|
meta = MetaData()
|
|
|
|
def _setup(self, url):
|
|
super(TestRename, self)._setup(url)
|
|
self.meta.bind = self.engine
|
|
|
|
@fixture.usedb(not_supported='firebird')
|
|
def test_rename_table(self):
|
|
"""Tables can be renamed"""
|
|
c_name = 'col_1'
|
|
table_name1 = 'name_one'
|
|
table_name2 = 'name_two'
|
|
index_name1 = 'x' + table_name1
|
|
index_name2 = 'x' + table_name2
|
|
|
|
self.meta.clear()
|
|
self.column = Column(c_name, Integer)
|
|
self.table = Table(table_name1, self.meta, self.column)
|
|
self.index = Index(index_name1, self.column, unique=False)
|
|
|
|
if self.engine.has_table(self.table.name):
|
|
self.table.drop()
|
|
if self.engine.has_table(table_name2):
|
|
tmp = Table(table_name2, self.meta, autoload=True)
|
|
tmp.drop()
|
|
tmp.deregister()
|
|
del tmp
|
|
self.table.create()
|
|
|
|
def assert_table_name(expected, skip_object_check=False):
|
|
"""Refresh a table via autoload
|
|
SA has changed some since this test was written; we now need to do
|
|
meta.clear() upon reloading a table - clear all rather than a
|
|
select few. So, this works only if we're working with one table at
|
|
a time (else, others will vanish too).
|
|
"""
|
|
if not skip_object_check:
|
|
# Table object check
|
|
self.assertEquals(self.table.name,expected)
|
|
newname = self.table.name
|
|
else:
|
|
# we know the object's name isn't consistent: just assign it
|
|
newname = expected
|
|
# Table DB check
|
|
self.meta.clear()
|
|
self.table = Table(newname, self.meta, autoload=True)
|
|
self.assertEquals(self.table.name, expected)
|
|
|
|
def assert_index_name(expected, skip_object_check=False):
|
|
if not skip_object_check:
|
|
# Index object check
|
|
self.assertEquals(self.index.name, expected)
|
|
else:
|
|
# object is inconsistent
|
|
self.index.name = expected
|
|
# TODO: Index DB check
|
|
|
|
try:
|
|
# Table renames
|
|
assert_table_name(table_name1)
|
|
rename_table(self.table, table_name2)
|
|
assert_table_name(table_name2)
|
|
self.table.rename(table_name1)
|
|
assert_table_name(table_name1)
|
|
|
|
# test by just the string
|
|
rename_table(table_name1, table_name2, engine=self.engine)
|
|
assert_table_name(table_name2, True) # object not updated
|
|
|
|
# Index renames
|
|
if self.url.startswith('sqlite') or self.url.startswith('mysql'):
|
|
self.assertRaises(exceptions.NotSupportedError,
|
|
self.index.rename, index_name2)
|
|
else:
|
|
assert_index_name(index_name1)
|
|
rename_index(self.index, index_name2, engine=self.engine)
|
|
assert_index_name(index_name2)
|
|
self.index.rename(index_name1)
|
|
assert_index_name(index_name1)
|
|
|
|
# test by just the string
|
|
rename_index(index_name1, index_name2, engine=self.engine)
|
|
assert_index_name(index_name2, True)
|
|
|
|
finally:
|
|
if self.table.exists():
|
|
self.table.drop()
|
|
|
|
|
|
class TestColumnChange(fixture.DB):
|
|
level = fixture.DB.CONNECT
|
|
table_name = 'tmp_colchange'
|
|
|
|
def _setup(self, url):
|
|
super(TestColumnChange, self)._setup(url)
|
|
self.meta = MetaData(self.engine)
|
|
self.table = Table(self.table_name, self.meta,
|
|
Column('id', Integer, primary_key=True),
|
|
Column('data', String(40), server_default=DefaultClause("tluafed"),
|
|
nullable=True),
|
|
)
|
|
if self.table.exists():
|
|
self.table.drop()
|
|
try:
|
|
self.table.create()
|
|
except sqlalchemy.exceptions.SQLError, e:
|
|
# SQLite: database schema has changed
|
|
if not self.url.startswith('sqlite://'):
|
|
raise
|
|
|
|
def _teardown(self):
|
|
if self.table.exists():
|
|
try:
|
|
self.table.drop(self.engine)
|
|
except sqlalchemy.exceptions.SQLError,e:
|
|
# SQLite: database schema has changed
|
|
if not self.url.startswith('sqlite://'):
|
|
raise
|
|
super(TestColumnChange, self)._teardown()
|
|
|
|
@fixture.usedb()
|
|
def test_rename(self):
|
|
"""Can rename a column"""
|
|
def num_rows(col, content):
|
|
return len(list(self.table.select(col == content).execute()))
|
|
# Table content should be preserved in changed columns
|
|
content = "fgsfds"
|
|
self.engine.execute(self.table.insert(), data=content, id=42)
|
|
self.assertEquals(num_rows(self.table.c.data, content), 1)
|
|
|
|
# ...as a function, given a column object and the new name
|
|
alter_column('data', name='data2', table=self.table)
|
|
self.refresh_table()
|
|
alter_column(self.table.c.data2, name='atad')
|
|
self.refresh_table(self.table.name)
|
|
self.assert_('data' not in self.table.c.keys())
|
|
self.assert_('atad' in self.table.c.keys())
|
|
self.assertEquals(num_rows(self.table.c.atad, content), 1)
|
|
|
|
# ...as a method, given a new name
|
|
self.table.c.atad.alter(name='data')
|
|
self.refresh_table(self.table.name)
|
|
self.assert_('atad' not in self.table.c.keys())
|
|
self.table.c.data # Should not raise exception
|
|
self.assertEquals(num_rows(self.table.c.data, content), 1)
|
|
|
|
# ...as a function, given a new object
|
|
alter_column(self.table.c.data,
|
|
name = 'atad', type=String(40),
|
|
server_default=self.table.c.data.server_default)
|
|
self.refresh_table(self.table.name)
|
|
self.assert_('data' not in self.table.c.keys())
|
|
self.table.c.atad # Should not raise exception
|
|
self.assertEquals(num_rows(self.table.c.atad, content), 1)
|
|
|
|
# ...as a method, given a new object
|
|
self.table.c.atad.alter(
|
|
name='data',type=String(40),
|
|
server_default=self.table.c.atad.server_default
|
|
)
|
|
self.refresh_table(self.table.name)
|
|
self.assert_('atad' not in self.table.c.keys())
|
|
self.table.c.data # Should not raise exception
|
|
self.assertEquals(num_rows(self.table.c.data,content), 1)
|
|
|
|
@fixture.usedb()
|
|
def test_type(self):
|
|
# Test we can change a column's type
|
|
|
|
# Just the new type
|
|
self.table.c.data.alter(type=String(43))
|
|
self.refresh_table(self.table.name)
|
|
self.assert_(isinstance(self.table.c.data.type, String))
|
|
self.assertEquals(self.table.c.data.type.length, 43)
|
|
|
|
# Different type
|
|
self.assert_(isinstance(self.table.c.id.type, Integer))
|
|
self.assertEquals(self.table.c.id.nullable, False)
|
|
|
|
if not self.engine.name == 'firebird':
|
|
self.table.c.id.alter(type=String(20))
|
|
self.assertEquals(self.table.c.id.nullable, False)
|
|
self.refresh_table(self.table.name)
|
|
self.assert_(isinstance(self.table.c.id.type, String))
|
|
|
|
@fixture.usedb()
|
|
def test_default(self):
|
|
"""Can change a column's server_default value (DefaultClauses only)
|
|
Only DefaultClauses are changed here: others are managed by the
|
|
application / by SA
|
|
"""
|
|
self.assertEquals(self.table.c.data.server_default.arg, 'tluafed')
|
|
|
|
# Just the new default
|
|
default = 'my_default'
|
|
self.table.c.data.alter(server_default=DefaultClause(default))
|
|
self.refresh_table(self.table.name)
|
|
#self.assertEquals(self.table.c.data.server_default.arg,default)
|
|
# TextClause returned by autoload
|
|
self.assert_(default in str(self.table.c.data.server_default.arg))
|
|
self.engine.execute(self.table.insert(), id=12)
|
|
row = self._select_row()
|
|
self.assertEqual(row['data'], default)
|
|
|
|
# Column object
|
|
default = 'your_default'
|
|
self.table.c.data.alter(type=String(40), server_default=DefaultClause(default))
|
|
self.refresh_table(self.table.name)
|
|
self.assert_(default in str(self.table.c.data.server_default.arg))
|
|
|
|
# Drop/remove default
|
|
self.table.c.data.alter(server_default=None)
|
|
self.assertEqual(self.table.c.data.server_default, None)
|
|
|
|
self.refresh_table(self.table.name)
|
|
# server_default isn't necessarily None for Oracle
|
|
#self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default)
|
|
self.engine.execute(self.table.insert(), id=11)
|
|
if SQLA_06:
|
|
row = self.table.select(self.table.c.id == 11).execution_options(autocommit=True).execute().fetchone()
|
|
else:
|
|
row = self.table.select(self.table.c.id == 11, autocommit=True).execute().fetchone()
|
|
self.assert_(row['data'] is None, row['data'])
|
|
|
|
@fixture.usedb(not_supported='firebird')
|
|
def test_null(self):
|
|
"""Can change a column's null constraint"""
|
|
self.assertEquals(self.table.c.data.nullable, True)
|
|
|
|
# Full column
|
|
self.table.c.data.alter(type=String(40), nullable=False)
|
|
self.table.nullable = None
|
|
self.refresh_table(self.table.name)
|
|
self.assertEquals(self.table.c.data.nullable, False)
|
|
|
|
# Just the new status
|
|
self.table.c.data.alter(nullable=True)
|
|
self.refresh_table(self.table.name)
|
|
self.assertEquals(self.table.c.data.nullable, True)
|
|
|
|
@fixture.usedb()
|
|
def test_alter_deprecated(self):
|
|
try:
|
|
# py 2.4 compatability :-/
|
|
cw = catch_warnings(record=True)
|
|
w = cw.__enter__()
|
|
|
|
warnings.simplefilter("always")
|
|
self.table.c.data.alter(Column('data', String(100)))
|
|
|
|
self.assertEqual(len(w),1)
|
|
self.assertTrue(issubclass(w[-1].category,
|
|
MigrateDeprecationWarning))
|
|
self.assertEqual(
|
|
'Passing a Column object to alter_column is deprecated. '
|
|
'Just pass in keyword parameters instead.',
|
|
str(w[-1].message))
|
|
finally:
|
|
cw.__exit__()
|
|
|
|
@fixture.usedb()
|
|
def test_alter_metadata(self):
|
|
"""Test if alter_metadata is respected"""
|
|
|
|
self.table.c.data.alter(type=String(100))
|
|
|
|
self.assert_(isinstance(self.table.c.data.type, String))
|
|
self.assertEqual(self.table.c.data.type.length, 100)
|
|
|
|
# nothing should change
|
|
self.table.c.data.alter(type=String(200),alter_metadata=False)
|
|
self.assert_(isinstance(self.table.c.data.type, String))
|
|
self.assertEqual(self.table.c.data.type.length, 100)
|
|
|
|
@fixture.usedb()
|
|
def test_alter_returns_delta(self):
|
|
"""Test if alter constructs return delta"""
|
|
|
|
delta = self.table.c.data.alter(type=String(100))
|
|
self.assert_('type' in delta)
|
|
|
|
@fixture.usedb()
|
|
def test_alter_all(self):
|
|
"""Tests all alter changes at one time"""
|
|
# test for each db separately
|
|
# since currently some dont support everything
|
|
|
|
# test pre settings
|
|
self.assertEqual(self.table.c.data.nullable, True)
|
|
self.assertEqual(self.table.c.data.server_default.arg, 'tluafed')
|
|
self.assertEqual(self.table.c.data.name, 'data')
|
|
self.assertTrue(isinstance(self.table.c.data.type, String))
|
|
self.assertTrue(self.table.c.data.type.length, 40)
|
|
|
|
kw = dict(nullable=False,
|
|
server_default='foobar',
|
|
name='data_new',
|
|
type=String(50),
|
|
alter_metadata=True)
|
|
if self.engine.name == 'firebird':
|
|
del kw['nullable']
|
|
self.table.c.data.alter(**kw)
|
|
|
|
# test altered objects
|
|
self.assertEqual(self.table.c.data.server_default.arg, 'foobar')
|
|
if not self.engine.name == 'firebird':
|
|
self.assertEqual(self.table.c.data.nullable, False)
|
|
self.assertEqual(self.table.c.data.name, 'data_new')
|
|
self.assertEqual(self.table.c.data.type.length, 50)
|
|
|
|
self.refresh_table(self.table.name)
|
|
|
|
# test post settings
|
|
if not self.engine.name == 'firebird':
|
|
self.assertEqual(self.table.c.data_new.nullable, False)
|
|
self.assertEqual(self.table.c.data_new.name, 'data_new')
|
|
self.assertTrue(isinstance(self.table.c.data_new.type, String))
|
|
self.assertTrue(self.table.c.data_new.type.length, 50)
|
|
|
|
# insert data and assert default
|
|
self.table.insert(values={'id': 10}).execute()
|
|
row = self._select_row()
|
|
self.assertEqual(u'foobar', row['data_new'])
|
|
|
|
|
|
class TestColumnDelta(fixture.DB):
|
|
"""Tests ColumnDelta class"""
|
|
|
|
level = fixture.DB.CONNECT
|
|
table_name = 'tmp_coldelta'
|
|
table_int = 0
|
|
|
|
def _setup(self, url):
|
|
super(TestColumnDelta, self)._setup(url)
|
|
self.meta = MetaData()
|
|
self.table = Table(self.table_name, self.meta,
|
|
Column('ids', String(10)),
|
|
)
|
|
self.meta.bind = self.engine
|
|
if self.engine.has_table(self.table.name):
|
|
self.table.drop()
|
|
self.table.create()
|
|
|
|
def _teardown(self):
|
|
if self.engine.has_table(self.table.name):
|
|
self.table.drop()
|
|
self.meta.clear()
|
|
super(TestColumnDelta,self)._teardown()
|
|
|
|
def mkcol(self, name='id', type=String, *p, **k):
|
|
return Column(name, type, *p, **k)
|
|
|
|
def verify(self, expected, original, *p, **k):
|
|
self.delta = ColumnDelta(original, *p, **k)
|
|
result = self.delta.keys()
|
|
result.sort()
|
|
self.assertEquals(expected, result)
|
|
return self.delta
|
|
|
|
def test_deltas_two_columns(self):
|
|
"""Testing ColumnDelta with two columns"""
|
|
col_orig = self.mkcol(primary_key=True)
|
|
col_new = self.mkcol(name='ids', primary_key=True)
|
|
self.verify([], col_orig, col_orig)
|
|
self.verify(['name'], col_orig, col_orig, 'ids')
|
|
self.verify(['name'], col_orig, col_orig, name='ids')
|
|
self.verify(['name'], col_orig, col_new)
|
|
self.verify(['name', 'type'], col_orig, col_new, type=String)
|
|
|
|
# Type comparisons
|
|
self.verify([], self.mkcol(type=String), self.mkcol(type=String))
|
|
self.verify(['type'], self.mkcol(type=String), self.mkcol(type=Integer))
|
|
self.verify(['type'], self.mkcol(type=String), self.mkcol(type=String(42)))
|
|
self.verify([], self.mkcol(type=String(42)), self.mkcol(type=String(42)))
|
|
self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=String(42)))
|
|
self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=Text(24)))
|
|
|
|
# Other comparisons
|
|
self.verify(['primary_key'], self.mkcol(nullable=False), self.mkcol(primary_key=True))
|
|
|
|
# PK implies nullable=False
|
|
self.verify(['nullable', 'primary_key'], self.mkcol(nullable=True), self.mkcol(primary_key=True))
|
|
self.verify([], self.mkcol(primary_key=True), self.mkcol(primary_key=True))
|
|
self.verify(['nullable'], self.mkcol(nullable=True), self.mkcol(nullable=False))
|
|
self.verify([], self.mkcol(nullable=True), self.mkcol(nullable=True))
|
|
self.verify([], self.mkcol(server_default=None), self.mkcol(server_default=None))
|
|
self.verify([], self.mkcol(server_default='42'), self.mkcol(server_default='42'))
|
|
|
|
# test server default
|
|
delta = self.verify(['server_default'], self.mkcol(), self.mkcol('id', String, DefaultClause('foobar')))
|
|
self.assertEqual(delta['server_default'].arg, 'foobar')
|
|
|
|
self.verify([], self.mkcol(server_default='foobar'), self.mkcol('id', String, DefaultClause('foobar')))
|
|
self.verify(['type'], self.mkcol(server_default='foobar'), self.mkcol('id', Text, DefaultClause('foobar')))
|
|
|
|
# test alter_metadata
|
|
col = self.mkcol(server_default='foobar')
|
|
self.verify(['type'], col, self.mkcol('id', Text, DefaultClause('foobar')), alter_metadata=True)
|
|
self.assert_(isinstance(col.type, Text))
|
|
|
|
col = self.mkcol()
|
|
self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=True)
|
|
self.assert_(isinstance(col.type, Text))
|
|
self.assertEqual(col.name, 'beep')
|
|
self.assertEqual(col.server_default.arg, 'foobar')
|
|
|
|
col = self.mkcol()
|
|
self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=False)
|
|
self.assertFalse(isinstance(col.type, Text))
|
|
self.assertNotEqual(col.name, 'beep')
|
|
self.assertFalse(col.server_default)
|
|
|
|
@fixture.usedb()
|
|
def test_deltas_zero_columns(self):
|
|
"""Testing ColumnDelta with zero columns"""
|
|
|
|
self.verify(['name'], 'ids', table=self.table, name='hey')
|
|
|
|
# test reflection
|
|
self.verify(['type'], 'ids', table=self.table.name, type=String(80), engine=self.engine)
|
|
self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta)
|
|
|
|
# check if alter_metadata is respected
|
|
self.meta.clear()
|
|
delta = self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=True, metadata=self.meta)
|
|
self.assert_(self.table.name in self.meta)
|
|
self.assertEqual(delta.result_column.type.length, 80)
|
|
self.assertEqual(self.meta.tables.get(self.table.name).c.ids.type.length, 80)
|
|
|
|
self.meta.clear()
|
|
self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, engine=self.engine)
|
|
self.assert_(self.table.name not in self.meta)
|
|
|
|
self.meta.clear()
|
|
self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, metadata=self.meta)
|
|
self.assert_(self.table.name not in self.meta)
|
|
|
|
# test defaults
|
|
self.meta.clear()
|
|
self.verify(['server_default'], 'ids', table=self.table.name, server_default='foobar', alter_metadata=True, metadata=self.meta)
|
|
self.meta.tables.get(self.table.name).c.ids.server_default.arg == 'foobar'
|
|
|
|
# test missing parameters
|
|
self.assertRaises(ValueError, ColumnDelta, table=self.table.name)
|
|
self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=True)
|
|
self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=False)
|
|
|
|
def test_deltas_one_column(self):
|
|
"""Testing ColumnDelta with one column"""
|
|
col_orig = self.mkcol(primary_key=True)
|
|
|
|
self.verify([], col_orig)
|
|
self.verify(['name'], col_orig, 'ids')
|
|
# Parameters are always executed, even if they're 'unchanged'
|
|
# (We can't assume given column is up-to-date)
|
|
self.verify(['name', 'primary_key', 'type'], col_orig, 'id', Integer, primary_key=True)
|
|
self.verify(['name', 'primary_key', 'type'], col_orig, name='id', type=Integer, primary_key=True)
|
|
|
|
# Change name, given an up-to-date definition and the current name
|
|
delta = self.verify(['name'], col_orig, name='blah')
|
|
self.assertEquals(delta.get('name'), 'blah')
|
|
self.assertEquals(delta.current_name, 'id')
|
|
|
|
# check if alter_metadata is respected
|
|
col_orig = self.mkcol(primary_key=True)
|
|
self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=True)
|
|
self.assert_(isinstance(col_orig.type, Text))
|
|
self.assertEqual(col_orig.name, 'id12')
|
|
|
|
col_orig = self.mkcol(primary_key=True)
|
|
self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=False)
|
|
self.assert_(isinstance(col_orig.type, String))
|
|
self.assertEqual(col_orig.name, 'id')
|
|
|
|
# test server default
|
|
col_orig = self.mkcol(primary_key=True)
|
|
delta = self.verify(['server_default'], col_orig, DefaultClause('foobar'))
|
|
self.assertEqual(delta['server_default'].arg, 'foobar')
|
|
|
|
delta = self.verify(['server_default'], col_orig, server_default=DefaultClause('foobar'))
|
|
self.assertEqual(delta['server_default'].arg, 'foobar')
|
|
|
|
# no change
|
|
col_orig = self.mkcol(server_default=DefaultClause('foobar'))
|
|
delta = self.verify(['type'], col_orig, DefaultClause('foobar'), type=PickleType)
|
|
self.assert_(isinstance(delta.result_column.type, PickleType))
|
|
|
|
# TODO: test server on update
|
|
# TODO: test bind metadata
|