cqle: Update for 3.0 compatibility WIP

Mostly needs test updates
This commit is contained in:
Adam Holmberg
2015-09-18 17:06:05 -05:00
parent 3ea270b78b
commit 5e7807c167
5 changed files with 66 additions and 200 deletions

View File

@@ -211,7 +211,7 @@ def sync_table(model):
# check for an existing column family
if raw_cf_name not in tables:
log.debug("sync_table creating new table %s", cf_name)
qs = get_create_table(model)
qs = _get_create_table(model)
try:
execute(qs)
@@ -223,8 +223,7 @@ def sync_table(model):
else:
log.debug("sync_table checking existing table %s", cf_name)
# see if we're missing any columns
fields = get_fields(model)
field_names = [x.name for x in fields]
field_names = _get_non_pk_field_names(tables[raw_cf_name])
model_fields = set()
# # TODO: does this work with db_name??
for name, col in model._columns.items():
@@ -242,7 +241,7 @@ def sync_table(model):
if db_fields_not_in_model:
log.info("Table %s has fields not referenced by model: %s", cf_name, db_fields_not_in_model)
update_compaction(model)
_update_options(model)
table = cluster.metadata.keyspaces[ks_name].tables[raw_cf_name]
@@ -331,9 +330,14 @@ def get_create_type(type_model, keyspace):
return type_meta.as_cql_query()
def get_create_table(model):
cf_name = model.column_family_name()
qs = ['CREATE TABLE {0}'.format(cf_name)]
def _get_create_table(model):
ks_table_name = model.column_family_name()
query_strings = ['CREATE TABLE {0}'.format(ks_table_name)]
# Need to use an existing table to make sure we're using the right
# metadata class
cluster = get_cluster()
existing_meta = cluster.metadata.keyspaces['system'].tables['local']
# add column types
pkeys = [] # primary keys
@@ -352,102 +356,30 @@ def get_create_table(model):
qtypes.append('PRIMARY KEY (({0}){1})'.format(', '.join(pkeys), ckeys and ', ' + ', '.join(ckeys) or ''))
qs += ['({0})'.format(', '.join(qtypes))]
query_strings += ['({0})'.format(', '.join(qtypes))]
with_qs = []
table_properties = ['bloom_filter_fp_chance', 'caching', 'comment',
'dclocal_read_repair_chance', 'default_time_to_live', 'gc_grace_seconds',
'index_interval', 'memtable_flush_period_in_ms', 'populate_io_cache_on_flush',
'read_repair_chance', 'replicate_on_write']
for prop_name in table_properties:
prop_value = getattr(model, '__{0}__'.format(prop_name), None)
if prop_value is not None:
# Strings needs to be single quoted
if isinstance(prop_value, six.string_types):
prop_value = "'{0}'".format(prop_value)
with_qs.append("{0} = {1}".format(prop_name, prop_value))
property_strings = []
_order = ['"{0}" {1}'.format(c.db_field_name, c.clustering_order or 'ASC') for c in model._clustering_keys.values()]
if _order:
with_qs.append('clustering order by ({0})'.format(', '.join(_order)))
property_strings.append('CLUSTERING ORDER BY ({0})'.format(', '.join(_order)))
compaction_options = get_compaction_options(model)
if compaction_options:
compaction_options = json.dumps(compaction_options).replace('"', "'")
with_qs.append("compaction = {0}".format(compaction_options))
property_strings += existing_meta._make_option_strings(model.__options__ or {})
# Add table properties.
if with_qs:
qs += ['WITH {0}'.format(' AND '.join(with_qs))]
if property_strings:
query_strings += ['WITH {0}'.format(' AND '.join(property_strings))]
qs = ' '.join(qs)
return qs
return ' '.join(query_strings)
def get_compaction_options(model):
"""
Generates dictionary (later converted to a string) for creating and altering
tables with compaction strategy
:param model:
:return:
"""
if not model.__compaction__:
return {}
result = {'class': model.__compaction__}
def setter(key, limited_to_strategy=None):
"""
sets key in result, checking if the key is limited to either SizeTiered or Leveled
:param key: one of the compaction options, like "bucket_high"
:param limited_to_strategy: SizeTieredCompactionStrategy, LeveledCompactionStrategy
:return:
"""
mkey = "__compaction_{0}__".format(key)
tmp = getattr(model, mkey)
if tmp and limited_to_strategy and limited_to_strategy != model.__compaction__:
raise CQLEngineException("{0} is limited to {1}".format(key, limited_to_strategy))
if tmp:
# Explicitly cast the values to strings to be able to compare the
# values against introspected values from Cassandra.
result[key] = str(tmp)
setter('tombstone_compaction_interval')
setter('tombstone_threshold')
setter('bucket_high', SizeTieredCompactionStrategy)
setter('bucket_low', SizeTieredCompactionStrategy)
setter('max_threshold', SizeTieredCompactionStrategy)
setter('min_threshold', SizeTieredCompactionStrategy)
setter('min_sstable_size', SizeTieredCompactionStrategy)
setter('sstable_size_in_mb', LeveledCompactionStrategy)
return result
def get_fields(model):
def _get_non_pk_field_names(table_meta):
# returns all fields that aren't part of the PK
ks_name = model._get_keyspace()
col_family = model._raw_column_family_name()
field_types = ['regular', 'static']
query = "select * from system.schema_columns where keyspace_name = %s and columnfamily_name = %s"
tmp = execute(query, [ks_name, col_family])
# Tables containing only primary keys do not appear to create
# any entries in system.schema_columns, as only non-primary-key attributes
# appear to be inserted into the schema_columns table
try:
return [Field(x['column_name'], x['validator']) for x in tmp if x['type'] in field_types]
except KeyError:
return [Field(x['column_name'], x['validator']) for x in tmp]
# convert to Field named tuples
pk_names = set(col.name for col in table_meta.primary_key)
field_names = [name for name in table_meta.columns.keys() if name not in pk_names]
return field_names
def get_table_settings(model):
def _get_table_metadata(model):
# returns the table as provided by the native driver for a given model
cluster = get_cluster()
ks = model._get_keyspace()
@@ -456,47 +388,26 @@ def get_table_settings(model):
return table
def update_compaction(model):
"""Updates the compaction options for the given model if necessary.
def _update_options(model):
"""Updates the table options for the given model if necessary.
:param model: The model to update.
:return: `True`, if the compaction options were modified in Cassandra,
:return: `True`, if the options were modified in Cassandra,
`False` otherwise.
:rtype: bool
"""
log.debug("Checking %s for compaction differences", model)
table = get_table_settings(model)
log.debug("Checking %s for option differences", model)
table_meta = _get_table_metadata(model)
existing_options = table_meta.options
existing_options = table.options.copy()
model_options = model.__options__ or {}
update_options = dict((k, v) for k, v in model_options if existing_options[k] != v)
existing_compaction_strategy = existing_options['compaction_strategy_class']
existing_options = json.loads(existing_options['compaction_strategy_options'])
desired_options = get_compaction_options(model)
desired_compact_strategy = desired_options.get('class', SizeTieredCompactionStrategy)
desired_options.pop('class', None)
do_update = False
if desired_compact_strategy not in existing_compaction_strategy:
do_update = True
for k, v in desired_options.items():
val = existing_options.pop(k, None)
if val != v:
do_update = True
# check compaction_strategy_options
if do_update:
options = get_compaction_options(model)
# jsonify
options = json.dumps(options).replace('"', "'")
cf_name = model.column_family_name()
query = "ALTER TABLE {0} with compaction = {1}".format(cf_name, options)
if update_options:
option_strings = table_meta._make_option_strings(update_options)
options = ' AND '.join(option_strings)
query = "ALTER TABLE {0} WITH {1}".format(model.column_family_name(), options)
execute(query)
return True

View File

@@ -310,46 +310,17 @@ class BaseModel(object):
__keyspace__ = None
__default_ttl__ = None
__polymorphic_key__ = None # DEPRECATED
__discriminator_value__ = None
# compaction options
__compaction__ = None
__compaction_tombstone_compaction_interval__ = None
__compaction_tombstone_threshold__ = None
__options__ = None
# compaction - size tiered options
__compaction_bucket_high__ = None
__compaction_bucket_low__ = None
__compaction_max_threshold__ = None
__compaction_min_threshold__ = None
__compaction_min_sstable_size__ = None
# compaction - leveled options
__compaction_sstable_size_in_mb__ = None
# end compaction
# the queryset class used for this class
__queryset__ = query.ModelQuerySet
__dmlquery__ = query.DMLQuery
__consistency__ = None # can be set per query
# Additional table properties
__bloom_filter_fp_chance__ = None
__caching__ = None
__comment__ = None
__dclocal_read_repair_chance__ = None
__default_time_to_live__ = None
__gc_grace_seconds__ = None
__index_interval__ = None
__memtable_flush_period_in_ms__ = None
__populate_io_cache_on_flush__ = None
__read_repair_chance__ = None
__replicate_on_write__ = None
_timestamp = None # optional timestamp to include with the operation (USING TIMESTAMP)
_if_not_exists = False # optional if_not_exists flag to check existence before insertion
@@ -791,6 +762,9 @@ class ModelMetaClass(type):
attrs['__discriminator_value__'] = attrs.get('__discriminator_value__', poly_key)
attrs['__polymorphic_key__'] = attrs['__discriminator_value__']
options = attrs.get('__options__') or {}
attrs['__default_ttl__'] = options.get('default_time_to_live')
def _transform_column(col_name, col_obj):
column_dict[col_name] = col_obj
if col_obj.primary_key:
@@ -965,11 +939,11 @@ class Model(BaseModel):
Sets the name of the keyspace used by this model.
"""
__default_ttl__ = None
__options__ = None
"""
*Optional* The default ttl used by this model.
*Optional* Table options applied with this model
This can be overridden by using the :meth:`~.ttl` method.
(e.g. compaction, default ttl, cache settings, tec.)
"""
__polymorphic_key__ = None

View File

@@ -2292,8 +2292,9 @@ class TableMetadataV3(TableMetadata):
for option in cls.option_maps:
value = options_copy.pop(option, {})
params = ("'%s': '%s'" % (k, v) for k, v in value.items())
ret.append("%s = {%s}" % (option, ', '.join(params)))
if value:
params = ("'%s': '%s'" % (k, v) for k, v in value.items())
ret.append("%s = {%s}" % (option, ', '.join(params)))
for name, value in options_copy.items():
if value is not None:

View File

@@ -18,7 +18,7 @@ from mock import patch
from cassandra.cqlengine import columns, SizeTieredCompactionStrategy, LeveledCompactionStrategy
from cassandra.cqlengine import CQLEngineException
from cassandra.cqlengine.management import get_compaction_options, drop_table, sync_table, get_table_settings
from cassandra.cqlengine.management import drop_table, sync_table, _get_table_metadata
from cassandra.cqlengine.models import Model
from tests.integration.cqlengine.base import BaseCassEngTestCase
@@ -64,34 +64,13 @@ class LeveledCompactionTest(BaseCompactionTest):
self.model = copy.deepcopy(CompactionLeveledStrategyModel)
def test_simple_leveled(self):
result = get_compaction_options(self.model)
assert result['class'] == LeveledCompactionStrategy
def test_bucket_high_fails(self):
self.assert_option_fails('bucket_high')
def test_bucket_low_fails(self):
self.assert_option_fails('bucket_low')
def test_max_threshold_fails(self):
self.assert_option_fails('max_threshold')
def test_min_threshold_fails(self):
self.assert_option_fails('min_threshold')
def test_min_sstable_size_fails(self):
self.assert_option_fails('min_sstable_size')
def test_sstable_size_in_mb(self):
with patch.object(self.model, '__compaction_sstable_size_in_mb__', 32):
result = get_compaction_options(self.model)
assert result['sstable_size_in_mb'] == '32'
class LeveledcompactionTestTable(Model):
__compaction__ = LeveledCompactionStrategy
__options__ = {} # TODO
__compaction_sstable_size_in_mb__ = 64
user_id = columns.UUID(primary_key=True)
@@ -203,13 +182,18 @@ class OptionsTest(BaseCassEngTestCase):
def test_all_size_tiered_options(self):
class AllSizeTieredOptionsModel(Model):
__compaction__ = SizeTieredCompactionStrategy
__compaction_bucket_low__ = .3
__compaction_bucket_high__ = 2
__compaction_min_threshold__ = 2
__compaction_max_threshold__ = 64
__compaction_tombstone_compaction_interval__ = 86400
#__options__ = {'compaction': {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
# 'bucket_low': '.3',
# 'bucket_high': '2',
# 'min_threshold': '2',
# 'max_threshold': '64',
# 'tombstone_compaction_interval': '86400'}}
__options__ = {'compaction': """{'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
'bucket_low': '.3',
'bucket_high': '2',
'min_threshold': '2',
'max_threshold': '64',
'tombstone_compaction_interval': '86400'}"""}
cid = columns.UUID(primary_key=True)
name = columns.Text()
@@ -217,18 +201,14 @@ class OptionsTest(BaseCassEngTestCase):
drop_table(AllSizeTieredOptionsModel)
sync_table(AllSizeTieredOptionsModel)
options = get_table_settings(AllSizeTieredOptionsModel).options['compaction_strategy_options']
options = json.loads(options)
table_meta = _get_table_metadata(AllSizeTieredOptionsModel)
cql = table_meta.export_as_string()
expected = table_meta._make_option_strings(AllSizeTieredOptionsModel.__options__)
for option in expected:
self.assertIn(option, cql)
expected = {u'min_threshold': u'2',
u'bucket_low': u'0.3',
u'tombstone_compaction_interval': u'86400',
u'bucket_high': u'2',
u'max_threshold': u'64'}
self.assertDictEqual(options, expected)
def test_all_leveled_options(self):
def xtest_all_leveled_options(self):
class AllLeveledOptionsModel(Model):

View File

@@ -41,7 +41,7 @@ class BaseTTLTest(BaseCassEngTestCase):
class TestDefaultTTLModel(Model):
__default_ttl__ = 20
__options__ = {'default_time_to_live': 20}
id = columns.UUID(primary_key=True, default=lambda:uuid4())
count = columns.Integer()
text = columns.Text(required=False)