Files
2017-04-24 17:20:57 -04:00

479 lines
16 KiB
Python

# Copyright 2013-2017 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import mock
import logging
from cassandra.cqlengine.connection import get_session, get_cluster
from cassandra.cqlengine import CQLEngineException
from cassandra.cqlengine import management
from cassandra.cqlengine.management import _get_table_metadata, sync_table, drop_table, sync_type
from cassandra.cqlengine.models import Model
from cassandra.cqlengine import columns
from tests.integration import PROTOCOL_VERSION, greaterthancass20, MockLoggingHandler, CASSANDRA_VERSION
from tests.integration.cqlengine.base import BaseCassEngTestCase
from tests.integration.cqlengine.query.test_queryset import TestModel
from cassandra.cqlengine.usertype import UserType
from tests.integration.cqlengine import DEFAULT_KEYSPACE
class KeyspaceManagementTest(BaseCassEngTestCase):
def test_create_drop_succeeeds(self):
cluster = get_cluster()
keyspace_ss = 'test_ks_ss'
self.assertNotIn(keyspace_ss, cluster.metadata.keyspaces)
management.create_keyspace_simple(keyspace_ss, 2)
self.assertIn(keyspace_ss, cluster.metadata.keyspaces)
management.drop_keyspace(keyspace_ss)
self.assertNotIn(keyspace_ss, cluster.metadata.keyspaces)
keyspace_nts = 'test_ks_nts'
self.assertNotIn(keyspace_nts, cluster.metadata.keyspaces)
management.create_keyspace_network_topology(keyspace_nts, {'dc1': 1})
self.assertIn(keyspace_nts, cluster.metadata.keyspaces)
management.drop_keyspace(keyspace_nts)
self.assertNotIn(keyspace_nts, cluster.metadata.keyspaces)
class DropTableTest(BaseCassEngTestCase):
def test_multiple_deletes_dont_fail(self):
sync_table(TestModel)
drop_table(TestModel)
drop_table(TestModel)
class LowercaseKeyModel(Model):
first_key = columns.Integer(primary_key=True)
second_key = columns.Integer(primary_key=True)
some_data = columns.Text()
class CapitalizedKeyModel(Model):
firstKey = columns.Integer(primary_key=True)
secondKey = columns.Integer(primary_key=True)
someData = columns.Text()
class PrimaryKeysOnlyModel(Model):
__table_name__ = "primary_keys_only"
__options__ = {'compaction': {'class': 'LeveledCompactionStrategy'}}
first_key = columns.Integer(primary_key=True)
second_key = columns.Integer(primary_key=True)
class PrimaryKeysModelChanged(Model):
__table_name__ = "primary_keys_only"
__options__ = {'compaction': {'class': 'LeveledCompactionStrategy'}}
new_first_key = columns.Integer(primary_key=True)
second_key = columns.Integer(primary_key=True)
class PrimaryKeysModelTypeChanged(Model):
__table_name__ = "primary_keys_only"
__options__ = {'compaction': {'class': 'LeveledCompactionStrategy'}}
first_key = columns.Float(primary_key=True)
second_key = columns.Integer(primary_key=True)
class PrimaryKeysRemovedPk(Model):
__table_name__ = "primary_keys_only"
__options__ = {'compaction': {'class': 'LeveledCompactionStrategy'}}
second_key = columns.Integer(primary_key=True)
class PrimaryKeysAddedClusteringKey(Model):
__table_name__ = "primary_keys_only"
__options__ = {'compaction': {'class': 'LeveledCompactionStrategy'}}
new_first_key = columns.Float(primary_key=True)
second_key = columns.Integer(primary_key=True)
class CapitalizedKeyTest(BaseCassEngTestCase):
def test_table_definition(self):
""" Tests that creating a table with capitalized column names succeeds """
sync_table(LowercaseKeyModel)
sync_table(CapitalizedKeyModel)
drop_table(LowercaseKeyModel)
drop_table(CapitalizedKeyModel)
class FirstModel(Model):
__table_name__ = 'first_model'
first_key = columns.UUID(primary_key=True)
second_key = columns.UUID()
third_key = columns.Text()
class SecondModel(Model):
__table_name__ = 'first_model'
first_key = columns.UUID(primary_key=True)
second_key = columns.UUID()
third_key = columns.Text()
fourth_key = columns.Text()
class ThirdModel(Model):
__table_name__ = 'first_model'
first_key = columns.UUID(primary_key=True)
second_key = columns.UUID()
third_key = columns.Text()
# removed fourth key, but it should stay in the DB
blah = columns.Map(columns.Text, columns.Text)
class FourthModel(Model):
__table_name__ = 'first_model'
first_key = columns.UUID(primary_key=True)
second_key = columns.UUID()
third_key = columns.Text()
# renamed model field, but map to existing column
renamed = columns.Map(columns.Text, columns.Text, db_field='blah')
class AddColumnTest(BaseCassEngTestCase):
def setUp(self):
drop_table(FirstModel)
def test_add_column(self):
sync_table(FirstModel)
meta_columns = _get_table_metadata(FirstModel).columns
self.assertEqual(set(meta_columns), set(FirstModel._columns))
sync_table(SecondModel)
meta_columns = _get_table_metadata(FirstModel).columns
self.assertEqual(set(meta_columns), set(SecondModel._columns))
sync_table(ThirdModel)
meta_columns = _get_table_metadata(FirstModel).columns
self.assertEqual(len(meta_columns), 5)
self.assertEqual(len(ThirdModel._columns), 4)
self.assertIn('fourth_key', meta_columns)
self.assertNotIn('fourth_key', ThirdModel._columns)
self.assertIn('blah', ThirdModel._columns)
self.assertIn('blah', meta_columns)
sync_table(FourthModel)
meta_columns = _get_table_metadata(FirstModel).columns
self.assertEqual(len(meta_columns), 5)
self.assertEqual(len(ThirdModel._columns), 4)
self.assertIn('fourth_key', meta_columns)
self.assertNotIn('fourth_key', FourthModel._columns)
self.assertIn('renamed', FourthModel._columns)
self.assertNotIn('renamed', meta_columns)
self.assertIn('blah', meta_columns)
class ModelWithTableProperties(Model):
__options__ = {'bloom_filter_fp_chance': '0.76328',
'comment': 'TxfguvBdzwROQALmQBOziRMbkqVGFjqcJfVhwGR',
'gc_grace_seconds': '2063',
'read_repair_chance': '0.17985',
'dclocal_read_repair_chance': '0.50811'}
key = columns.UUID(primary_key=True)
class TablePropertiesTests(BaseCassEngTestCase):
def setUp(self):
drop_table(ModelWithTableProperties)
def test_set_table_properties(self):
sync_table(ModelWithTableProperties)
expected = {'bloom_filter_fp_chance': 0.76328,
'comment': 'TxfguvBdzwROQALmQBOziRMbkqVGFjqcJfVhwGR',
'gc_grace_seconds': 2063,
'read_repair_chance': 0.17985,
# For some reason 'dclocal_read_repair_chance' in CQL is called
# just 'local_read_repair_chance' in the schema table.
# Source: https://issues.apache.org/jira/browse/CASSANDRA-6717
# TODO: due to a bug in the native driver i'm not seeing the local read repair chance show up
# 'local_read_repair_chance': 0.50811,
}
options = management._get_table_metadata(ModelWithTableProperties).options
self.assertEqual(dict([(k, options.get(k)) for k in expected.keys()]),
expected)
def test_table_property_update(self):
ModelWithTableProperties.__options__['bloom_filter_fp_chance'] = 0.66778
ModelWithTableProperties.__options__['comment'] = 'xirAkRWZVVvsmzRvXamiEcQkshkUIDINVJZgLYSdnGHweiBrAiJdLJkVohdRy'
ModelWithTableProperties.__options__['gc_grace_seconds'] = 96362
ModelWithTableProperties.__options__['read_repair_chance'] = 0.2989
ModelWithTableProperties.__options__['dclocal_read_repair_chance'] = 0.12732
sync_table(ModelWithTableProperties)
table_options = management._get_table_metadata(ModelWithTableProperties).options
self.assertDictContainsSubset(ModelWithTableProperties.__options__, table_options)
def test_bogus_option_update(self):
sync_table(ModelWithTableProperties)
option = 'no way will this ever be an option'
try:
ModelWithTableProperties.__options__[option] = 'what was I thinking?'
self.assertRaisesRegexp(KeyError, "Invalid table option.*%s.*" % option, sync_table, ModelWithTableProperties)
finally:
ModelWithTableProperties.__options__.pop(option, None)
class SyncTableTests(BaseCassEngTestCase):
def setUp(self):
drop_table(PrimaryKeysOnlyModel)
def test_sync_table_works_with_primary_keys_only_tables(self):
sync_table(PrimaryKeysOnlyModel)
# blows up with DoesNotExist if table does not exist
table_meta = management._get_table_metadata(PrimaryKeysOnlyModel)
self.assertIn('LeveledCompactionStrategy', table_meta.as_cql_query())
PrimaryKeysOnlyModel.__options__['compaction']['class'] = 'SizeTieredCompactionStrategy'
sync_table(PrimaryKeysOnlyModel)
table_meta = management._get_table_metadata(PrimaryKeysOnlyModel)
self.assertIn('SizeTieredCompactionStrategy', table_meta.as_cql_query())
def test_primary_key_validation(self):
"""
Test to ensure that changes to primary keys throw CQLEngineExceptions
@since 3.2
@jira_ticket PYTHON-532
@expected_result Attempts to modify primary keys throw an exception
@test_category object_mapper
"""
sync_table(PrimaryKeysOnlyModel)
self.assertRaises(CQLEngineException, sync_table, PrimaryKeysModelChanged)
self.assertRaises(CQLEngineException, sync_table, PrimaryKeysAddedClusteringKey)
self.assertRaises(CQLEngineException, sync_table, PrimaryKeysRemovedPk)
class IndexModel(Model):
__table_name__ = 'index_model'
first_key = columns.UUID(primary_key=True)
second_key = columns.Text(index=True)
class IndexCaseSensitiveModel(Model):
__table_name__ = 'IndexModel'
__table_name_case_sensitive__ = True
first_key = columns.UUID(primary_key=True)
second_key = columns.Text(index=True)
class BaseInconsistent(Model):
__table_name__ = 'inconsistent'
first_key = columns.UUID(primary_key=True)
second_key = columns.Integer(index=True)
third_key = columns.Integer(index=True)
class ChangedInconsistent(Model):
__table_name__ = 'inconsistent'
__table_name_case_sensitive__ = True
first_key = columns.UUID(primary_key=True)
second_key = columns.Text(index=True)
class BaseInconsistentType(UserType):
__type_name__ = 'type_inconsistent'
age = columns.Integer()
name = columns.Text()
class ChangedInconsistentType(UserType):
__type_name__ = 'type_inconsistent'
age = columns.Integer()
name = columns.Integer()
class InconsistentTable(BaseCassEngTestCase):
def setUp(self):
drop_table(IndexModel)
def test_sync_warnings(self):
"""
Test to insure when inconsistent changes are made to a table, or type as part of a sync call that the proper logging messages are surfaced
@since 3.2
@jira_ticket PYTHON-260
@expected_result warnings are logged
@test_category object_mapper
"""
mock_handler = MockLoggingHandler()
logger = logging.getLogger(management.__name__)
logger.addHandler(mock_handler)
sync_table(BaseInconsistent)
sync_table(ChangedInconsistent)
self.assertTrue('differing from the model type' in mock_handler.messages.get('warning')[0])
if CASSANDRA_VERSION >= '2.1':
sync_type(DEFAULT_KEYSPACE, BaseInconsistentType)
mock_handler.reset()
sync_type(DEFAULT_KEYSPACE, ChangedInconsistentType)
self.assertTrue('differing from the model user type' in mock_handler.messages.get('warning')[0])
logger.removeHandler(mock_handler)
class TestIndexSetModel(Model):
partition = columns.UUID(primary_key=True)
int_set = columns.Set(columns.Integer, index=True)
int_list = columns.List(columns.Integer, index=True)
text_map = columns.Map(columns.Text, columns.DateTime, index=True)
mixed_tuple = columns.Tuple(columns.Text, columns.Integer, columns.Text, index=True)
class IndexTests(BaseCassEngTestCase):
def setUp(self):
drop_table(IndexModel)
drop_table(IndexCaseSensitiveModel)
def test_sync_index(self):
"""
Tests the default table creation, and ensures the table_name is created and surfaced correctly
in the table metadata
@since 3.1
@jira_ticket PYTHON-337
@expected_result table_name is lower case
@test_category object_mapper
"""
sync_table(IndexModel)
table_meta = management._get_table_metadata(IndexModel)
self.assertIsNotNone(management._get_index_name_by_column(table_meta, 'second_key'))
# index already exists
sync_table(IndexModel)
table_meta = management._get_table_metadata(IndexModel)
self.assertIsNotNone(management._get_index_name_by_column(table_meta, 'second_key'))
def test_sync_index_case_sensitive(self):
"""
Tests the default table creation, and ensures the table_name is created correctly and surfaced correctly
in table metadata
@since 3.1
@jira_ticket PYTHON-337
@expected_result table_name is lower case
@test_category object_mapper
"""
sync_table(IndexCaseSensitiveModel)
table_meta = management._get_table_metadata(IndexCaseSensitiveModel)
self.assertIsNotNone(management._get_index_name_by_column(table_meta, 'second_key'))
# index already exists
sync_table(IndexCaseSensitiveModel)
table_meta = management._get_table_metadata(IndexCaseSensitiveModel)
self.assertIsNotNone(management._get_index_name_by_column(table_meta, 'second_key'))
@greaterthancass20
def test_sync_indexed_set(self):
"""
Tests that models that have container types with indices can be synced.
@since 3.2
@jira_ticket PYTHON-533
@expected_result table_sync should complete without a server error.
@test_category object_mapper
"""
sync_table(TestIndexSetModel)
table_meta = management._get_table_metadata(TestIndexSetModel)
self.assertIsNotNone(management._get_index_name_by_column(table_meta, 'int_set'))
self.assertIsNotNone(management._get_index_name_by_column(table_meta, 'int_list'))
self.assertIsNotNone(management._get_index_name_by_column(table_meta, 'text_map'))
self.assertIsNotNone(management._get_index_name_by_column(table_meta, 'mixed_tuple'))
class NonModelFailureTest(BaseCassEngTestCase):
class FakeModel(object):
pass
def test_failure(self):
with self.assertRaises(CQLEngineException):
sync_table(self.FakeModel)
class StaticColumnTests(BaseCassEngTestCase):
def test_static_columns(self):
if PROTOCOL_VERSION < 2:
raise unittest.SkipTest("Native protocol 2+ required, currently using: {0}".format(PROTOCOL_VERSION))
class StaticModel(Model):
id = columns.Integer(primary_key=True)
c = columns.Integer(primary_key=True)
name = columns.Text(static=True)
drop_table(StaticModel)
session = get_session()
with mock.patch.object(session, "execute", wraps=session.execute) as m:
sync_table(StaticModel)
self.assertGreater(m.call_count, 0)
statement = m.call_args[0][0].query_string
self.assertIn('"name" text static', statement)
# if we sync again, we should not apply an alter w/ a static
sync_table(StaticModel)
with mock.patch.object(session, "execute", wraps=session.execute) as m2:
sync_table(StaticModel)
self.assertEqual(len(m2.call_args_list), 0)