Add User Definted Types to mapper.

First pass; requires tests, further documentation
This commit is contained in:
Adam Holmberg
2015-03-03 13:02:35 -06:00
parent ef72d2fa75
commit ae568b4b7e
10 changed files with 343 additions and 18 deletions

View File

@@ -251,6 +251,11 @@ class Column(object):
static = "static" if self.static else ""
return '{} {} {}'.format(self.cql, self.db_type, static)
# TODO: make columns use cqltypes under the hood
# until then, this bridges the gap in using types along with cassandra.metadata for CQL generation
def cql_parameterized_type(self):
return self.db_type
def set_column_name(self, name):
"""
Sets the column name during document class construction
@@ -279,6 +284,10 @@ class Column(object):
""" determines if the given value equates to a null value for the given column type """
return val is None
@property
def sub_columns(self):
return []
class Blob(Column):
"""
@@ -671,6 +680,10 @@ class BaseContainerColumn(Column):
def _val_is_null(self, val):
return not val
@property
def sub_columns(self):
return [self.value_col]
class BaseContainerQuoter(ValueQuoter):
@@ -841,6 +854,37 @@ class Map(BaseContainerColumn):
return value
return self.Quoter({self.key_col.to_database(k): self.value_col.to_database(v) for k, v in value.items()})
@property
def sub_columns(self):
return [self.key_col, self.value_col]
class UserDefinedType(Column):
"""
User Defined Type column
http://www.datastax.com/documentation/cql/3.1/cql/cql_using/cqlUseUDT.html
"""
def __init__(self, user_type, **kwargs):
"""
:param type user_type: specifies a :class:`~.Type` model for the column
"""
self.user_type = user_type
self.db_type = "frozen<%s>" % user_type.type_name()
super(UserDefinedType, self).__init__(**kwargs)
@property
def sub_columns(self):
return list(self.user_type._fields.values())
def resolve_udts(col_def, out_list):
for col in col_def.sub_columns:
resolve_udts(col, out_list)
if isinstance(col_def, UserDefinedType):
out_list.append(col_def.user_type)
class _PartitionKeysToken(Column):
"""

View File

@@ -17,7 +17,7 @@ import logging
import six
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable
from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable, UserTypeDoesNotExist
from cassandra.query import SimpleStatement, Statement, dict_factory
from cassandra.cqlengine import CQLEngineException
@@ -36,6 +36,12 @@ lazy_connect_args = None
default_consistency_level = ConsistencyLevel.ONE
# Because type models may be registered before a connection is present,
# and because sessions may be replaced, we must register UDTs here, in order
# to have them registered when a new session is established.
udt_by_keyspace = {}
class UndefinedKeyspaceException(CQLEngineException):
pass
@@ -54,6 +60,8 @@ def default():
session = cluster.connect()
session.row_factory = dict_factory
_register_known_types(cluster)
log.debug("cqlengine connection initialized with default session to localhost")
@@ -74,6 +82,8 @@ def set_session(s):
session = s
cluster = s.cluster
_register_known_types(cluster)
log.debug("cqlengine connection initialized with %s", s)
@@ -129,6 +139,8 @@ def setup(
raise
session.row_factory = dict_factory
_register_known_types(cluster)
def execute(query, params=None, consistency_level=None, timeout=NOT_SET):
@@ -178,3 +190,23 @@ def handle_lazy_connect():
hosts, kwargs = lazy_connect_args
lazy_connect_args = None
setup(hosts, **kwargs)
def register_udt(keyspace, type_name, klass):
try:
udt_by_keyspace[keyspace][type_name] = klass
except KeyError:
udt_by_keyspace[keyspace] = {type_name: klass}
global cluster
if cluster:
cluster.register_user_type(keyspace, type_name, klass)
def _register_known_types(cluster):
for ks_name, name_type_map in udt_by_keyspace.items():
for type_name, klass in name_type_map.items():
try:
cluster.register_user_type(ks_name, type_name, klass)
except UserTypeDoesNotExist:
pass # new types are covered in management sync functions

View File

@@ -19,11 +19,13 @@ import os
import six
import warnings
from cassandra.metadata import KeyspaceMetadata
from cassandra import metadata
from cassandra.cqlengine import CQLEngineException, SizeTieredCompactionStrategy, LeveledCompactionStrategy
from cassandra.cqlengine import columns
from cassandra.cqlengine.connection import execute, get_cluster
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.named import NamedTable
from cassandra.cqlengine.usertype import UserType
CQLENG_ALLOW_SCHEMA_MANAGEMENT = 'CQLENG_ALLOW_SCHEMA_MANAGEMENT'
@@ -132,7 +134,7 @@ def _create_keyspace(name, durable_writes, strategy_class, strategy_options):
if name not in cluster.metadata.keyspaces:
log.info("Creating keyspace %s ", name)
ks_meta = KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options)
ks_meta = metadata.KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options)
execute(ks_meta.as_cql_query())
else:
log.info("Not creating keyspace %s because it already exists", name)
@@ -168,6 +170,8 @@ def sync_table(model):
"""
Inspects the model and creates / updates the corresponding table and columns.
Any User Defined Types used in the table are implicitly synchronized.
This function can only add fields that are not part of the primary key.
Note that the attributes removed from the model are not deleted on the database.
@@ -198,6 +202,13 @@ def sync_table(model):
keyspace = cluster.metadata.keyspaces[ks_name]
tables = keyspace.tables
syncd_types = set()
for col in model._columns.values():
udts = []
columns.resolve_udts(col, udts)
for udt in [u for u in udts if u not in syncd_types]:
_sync_type(ks_name, udt, syncd_types)
# check for an existing column family
if raw_cf_name not in tables:
log.debug("sync_table creating new table %s", cf_name)
@@ -216,6 +227,7 @@ def sync_table(model):
fields = get_fields(model)
field_names = [x.name for x in fields]
model_fields = set()
# # TODO: does this work with db_name??
for name, col in model._columns.items():
if col.primary_key or col.partition_key:
continue # we can't mess with the PK
@@ -248,6 +260,77 @@ def sync_table(model):
execute(qs)
def sync_type(ks_name, type_model):
"""
Inspects the type_model and creates / updates the corresponding type.
Note that the attributes removed from the type_model are not deleted on the database (this operation is not supported).
They become effectively ignored by (will not show up on) the type_model.
**This function should be used with caution, especially in production environments.
Take care to execute schema modifications in a single context (i.e. not concurrently with other clients).**
*There are plans to guard schema-modifying functions with an environment-driven conditional.*
"""
if not _allow_schema_modification():
return
if not issubclass(type_model, UserType):
raise CQLEngineException("Types must be derived from base UserType.")
_sync_type(ks_name, type_model)
def _sync_type(ks_name, type_model, omit_subtypes=None):
syncd_sub_types = omit_subtypes or set()
for field in type_model._fields.values():
udts = []
columns.resolve_udts(field, udts)
for udt in [u for u in udts if u not in syncd_sub_types]:
_sync_type(ks_name, udt, syncd_sub_types)
syncd_sub_types.add(udt)
type_name = type_model.type_name()
type_name_qualified = "%s.%s" % (ks_name, type_name)
cluster = get_cluster()
keyspace = cluster.metadata.keyspaces[ks_name]
defined_types = keyspace.user_types
if type_name not in defined_types:
log.debug("sync_type creating new type %s", type_name_qualified)
cql = get_create_type(type_model, ks_name)
execute(cql)
type_model.register_for_keyspace(ks_name)
else:
defined_fields = defined_types[type_name].field_names
model_fields = set()
for field in type_model._fields.values():
model_fields.add(field.db_field_name)
if field.db_field_name not in defined_fields:
execute("ALTER TYPE {} ADD {}".format(type_name_qualified, field.get_column_def()))
if len(defined_fields) == len(model_fields):
log.info("Type %s did not require synchronization", type_name_qualified)
return
db_fields_not_in_model = model_fields.symmetric_difference(defined_fields)
if db_fields_not_in_model:
log.info("Type %s has fields not referenced by model: %s", type_name_qualified, db_fields_not_in_model)
type_model.register_for_keyspace(ks_name)
def get_create_type(type_model, keyspace):
type_meta = metadata.UserType(keyspace,
type_model.type_name(),
(f.db_field_name for f in type_model._fields.values()),
type_model._fields.values())
return type_meta.as_cql_query()
def get_create_table(model):
cf_name = model.column_family_name()
qs = ['CREATE TABLE {}'.format(cf_name)]
@@ -439,11 +522,12 @@ def drop_table(model):
raw_cf_name = model.column_family_name(include_keyspace=False)
try:
table = meta.keyspaces[ks_name].tables[raw_cf_name]
meta.keyspaces[ks_name].tables[raw_cf_name]
execute('drop table {};'.format(model.column_family_name(include_keyspace=True)))
except KeyError:
pass
def _allow_schema_modification():
if not os.getenv(CQLENG_ALLOW_SCHEMA_MANAGEMENT):
msg = CQLENG_ALLOW_SCHEMA_MANAGEMENT + " environment variable is not set. Future versions of this package will require this variable to enable management functions."

View File

@@ -19,7 +19,8 @@ import warnings
from cassandra.cqlengine import CQLEngineException, ValidationError
from cassandra.cqlengine import columns
from cassandra.cqlengine.query import ModelQuerySet, DMLQuery, AbstractQueryableColumn, NOT_SET
from cassandra.cqlengine import connection
from cassandra.cqlengine import query
from cassandra.cqlengine.query import DoesNotExist as _DoesNotExist
from cassandra.cqlengine.query import MultipleObjectsReturned as _MultipleObjectsReturned
from cassandra.util import OrderedDict
@@ -211,7 +212,7 @@ class ConsistencyDescriptor(object):
raise NotImplementedError
class ColumnQueryEvaluator(AbstractQueryableColumn):
class ColumnQueryEvaluator(query.AbstractQueryableColumn):
"""
Wraps a column and allows it to be used in comparator
expressions, returning query operators
@@ -330,8 +331,8 @@ class BaseModel(object):
# end compaction
# the queryset class used for this class
__queryset__ = ModelQuerySet
__dmlquery__ = DMLQuery
__queryset__ = query.ModelQuerySet
__dmlquery__ = query.DMLQuery
__consistency__ = None # can be set per query
@@ -371,7 +372,7 @@ class BaseModel(object):
# that update should be used when persisting changes
self._is_persisted = False
self._batch = None
self._timeout = NOT_SET
self._timeout = connection.NOT_SET
def __repr__(self):
"""
@@ -741,7 +742,7 @@ class BaseModel(object):
return cls.objects.batch(batch)
def _inst_batch(self, batch):
assert self._timeout is NOT_SET, 'Setting both timeout and batch is not supported'
assert self._timeout is connection.NOT_SET, 'Setting both timeout and batch is not supported'
self._batch = batch
return self
@@ -919,6 +920,13 @@ class ModelMetaClass(type):
# create the class and add a QuerySet to it
klass = super(ModelMetaClass, cls).__new__(cls, name, bases, attrs)
udts = []
for col in column_dict.values():
columns.resolve_udts(col, udts)
for user_type in set(udts):
user_type.register_for_keyspace(klass._get_keyspace())
return klass

View File

@@ -18,7 +18,7 @@ import time
import six
from cassandra.cqlengine import columns, CQLEngineException, ValidationError, UnicodeMixin
from cassandra.cqlengine.connection import execute, NOT_SET
from cassandra.cqlengine import connection
from cassandra.cqlengine.functions import Token, BaseQueryFunction, QueryValue
from cassandra.cqlengine.operators import (InOperator, EqualsOperator, GreaterThanOperator,
GreaterThanOrEqualOperator, LessThanOperator,
@@ -117,7 +117,7 @@ class BatchQuery(object):
_consistency = None
def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on_exception=False,
timeout=NOT_SET):
timeout=connection.NOT_SET):
"""
:param batch_type: (optional) One of batch type values available through BatchType enum
:type batch_type: str or None
@@ -211,7 +211,7 @@ class BatchQuery(object):
query_list.append('APPLY BATCH;')
tmp = execute('\n'.join(query_list), parameters, self._consistency, self._timeout)
tmp = connection.execute('\n'.join(query_list), parameters, self._consistency, self._timeout)
check_applied(tmp)
self.queries = []
@@ -264,7 +264,7 @@ class AbstractQuerySet(object):
self._consistency = None
self._timestamp = None
self._if_not_exists = False
self._timeout = NOT_SET
self._timeout = connection.NOT_SET
@property
def column_family_name(self):
@@ -274,7 +274,7 @@ class AbstractQuerySet(object):
if self._batch:
return self._batch.add_query(q)
else:
result = execute(q, consistency_level=self._consistency, timeout=self._timeout)
result = connection.execute(q, consistency_level=self._consistency, timeout=self._timeout)
if self._transaction:
check_applied(result)
return result
@@ -1023,7 +1023,7 @@ class DMLQuery(object):
_if_not_exists = False
def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None, timestamp=None,
if_not_exists=False, transaction=None, timeout=NOT_SET):
if_not_exists=False, transaction=None, timeout=connection.NOT_SET):
self.model = model
self.column_family_name = self.model.column_family_name()
self.instance = instance
@@ -1039,7 +1039,7 @@ class DMLQuery(object):
if self._batch:
return self._batch.add_query(q)
else:
tmp = execute(q, consistency_level=self._consistency, timeout=self._timeout)
tmp = connection.execute(q, consistency_level=self._consistency, timeout=self._timeout)
if self._if_not_exists or self._transaction:
check_applied(tmp)
return tmp

View File

@@ -0,0 +1,144 @@
import re
import six
from cassandra.cluster import UserTypeDoesNotExist
from cassandra.util import OrderedDict
from cassandra.cqlengine import CQLEngineException
from cassandra.cqlengine import columns
from cassandra.cqlengine import connection
from cassandra.cqlengine import models
class UserTypeException(CQLEngineException):
pass
class UserTypeDefinitionException(UserTypeException):
pass
class BaseUserType(object):
"""
The base type class; don't inherit from this, inherit from UserType, defined below
"""
__type_name__ = None
_fields = None
_db_map = None
def __init__(self, **values):
self._values = {}
for name, field in self._fields.items():
value = values.get(name, None)
if value is not None or isinstance(field, columns.BaseContainerColumn):
value = field.to_python(value)
value_mngr = field.value_manager(self, field, value)
if name in values:
value_mngr.explicit = True
self._values[name] = value_mngr
def __eq__(self, other):
print self.__class__, other.__class__
if self.__class__ != other.__class__:
return False
keys = set(self._fields.keys())
other_keys = set(other._fields.keys())
if keys != other_keys:
return False
for key in other_keys:
if getattr(self, key, None) != getattr(other, key, None):
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
@classmethod
def register_for_keyspace(cls, keyspace):
connection.register_udt(keyspace, cls.type_name(), cls)
@classmethod
def type_name(cls):
"""
Returns the type name if it's been defined
otherwise, it creates it from the class name
"""
if cls.__type_name__:
type_name = cls.__type_name__.lower()
else:
camelcase = re.compile(r'([a-z])([A-Z])')
ccase = lambda s: camelcase.sub(lambda v: '{}_{}'.format(v.group(1), v.group(2)), s)
type_name = ccase(cls.__name__)
# trim to less than 48 characters or cassandra will complain
type_name = type_name[-48:]
type_name = type_name.lower()
type_name = re.sub(r'^_+', '', type_name)
cls.__type_name__ = type_name
return type_name
def validate(self):
"""
Cleans and validates the field values
"""
pass
for name, field in self._fields.items():
v = getattr(self, name)
if v is None and not self._values[name].explicit and field.has_default:
v = field.get_default()
val = field.validate(v)
setattr(self, name, val)
class UserTypeMetaClass(type):
def __new__(cls, name, bases, attrs):
field_dict = OrderedDict()
field_defs = [(k, v) for k, v in attrs.items() if isinstance(v, columns.Column)]
field_defs = sorted(field_defs, key=lambda x: x[1].position)
# TODO: this plus more validation
#counter_columns = [c for c in defined_columns.values() if isinstance(c, columns.Counter)]
#if counter_columns and data_columns:
# raise ModelDefinitionException('counter models may not have data columns')
def _transform_column(field_name, field_obj):
field_dict[field_name] = field_obj
field_obj.set_column_name(field_name)
attrs[field_name] = models.ColumnDescriptor(field_obj)
# transform field definitions
for k, v in field_defs:
# don't allow a field with the same name as a built-in attribute or method
if k in BaseUserType.__dict__:
raise UserTypeDefinitionException("field '{}' conflicts with built-in attribute/method".format(k))
_transform_column(k, v)
# create db_name -> model name map for loading
db_map = {}
for field_name, field in field_dict.items():
db_map[field.db_field_name] = field_name
attrs['_fields'] = field_dict
attrs['_db_map'] = db_map
klass = super(UserTypeMetaClass, cls).__new__(cls, name, bases, attrs)
return klass
@six.add_metaclass(UserTypeMetaClass)
class UserType(BaseUserType):
__type_name__ = None
"""
*Optional.* Sets the name of the CQL type for this type.
If not specified, the type name will be the name of the class, with it's module name as it's prefix.
"""

View File

@@ -13,5 +13,7 @@ A collection of functions for managing keyspace and table schema.
.. autofunction:: sync_table
.. autofunction:: sync_type
.. autofunction:: drop_table

View File

@@ -0,0 +1,10 @@
``cassandra.cqlengine.usertype`` - Model classes for User Defined Types
=======================================================================
.. module:: cassandra.cqlengine.usertype
UserType
--------
.. autoclass:: UserType
.. autoattribute:: __type_name__

View File

@@ -37,3 +37,4 @@ Object Mapper
cassandra/cqlengine/query
cassandra/cqlengine/connection
cassandra/cqlengine/management
cassandra/cqlengine/usertype

View File

@@ -3,7 +3,7 @@ Frequently Asked Questions
==========================
Q: Why don't updates work correctly on models instantiated as Model(field=value, field2=value2)?
----------------------------------------------------------------------------------------------
------------------------------------------------------------------------------------------------
A: The recommended way to create new rows is with the models .create method. The values passed into a model's init method are interpreted by the model as the values as they were read from a row. This allows the model to "know" which rows have changed since the row was read out of cassandra, and create suitable update statements.