Add UserType.as_cql_query, unit test coverage
This commit is contained in:
@@ -607,11 +607,33 @@ class KeyspaceMetadata(object):
|
||||
|
||||
|
||||
class UserType(object):
|
||||
"""
|
||||
A user defined type, as created by ``CREATE TYPE`` statements.
|
||||
|
||||
User-defined types were introduced in Cassandra 2.1.
|
||||
|
||||
.. versionadded:: 2.1.0
|
||||
"""
|
||||
|
||||
keyspace = None
|
||||
"""
|
||||
The string name of the keyspace in which this type is defined.
|
||||
"""
|
||||
|
||||
name = None
|
||||
"""
|
||||
The name of this type.
|
||||
"""
|
||||
|
||||
field_names = None
|
||||
"""
|
||||
An ordered list of the names for each field in this user-defined type.
|
||||
"""
|
||||
|
||||
field_types = None
|
||||
"""
|
||||
An ordered list of the types for each field in this user-defined type.
|
||||
"""
|
||||
|
||||
def __init__(self, keyspace, name, field_names, field_types):
|
||||
self.keyspace = keyspace
|
||||
@@ -619,6 +641,32 @@ class UserType(object):
|
||||
self.field_names = field_names
|
||||
self.field_types = field_types
|
||||
|
||||
def as_cql_query(self, formatted=False):
|
||||
"""
|
||||
Returns a CQL query that can be used to recreate this type.
|
||||
If `formatted` is set to :const:`True`, extra whitespace will
|
||||
be added to make the query more readable.
|
||||
"""
|
||||
ret = "CREATE TYPE %s.%s (%s" % (
|
||||
protect_name(self.keyspace),
|
||||
protect_name(self.name),
|
||||
"\n" if formatted else "")
|
||||
|
||||
if formatted:
|
||||
field_join = ",\n"
|
||||
padding = " "
|
||||
else:
|
||||
field_join = ", "
|
||||
padding = ""
|
||||
|
||||
fields = []
|
||||
for field_name, field_type in zip(self.field_names, self.field_types):
|
||||
fields.append("%s %s" % (protect_name(field_name), field_type.cql_parameterized_type()))
|
||||
|
||||
ret += field_join.join("%s%s" % (padding, field) for field in fields)
|
||||
ret += "\n)" if formatted else ")"
|
||||
return ret
|
||||
|
||||
|
||||
class TableMetadata(object):
|
||||
"""
|
||||
|
||||
@@ -18,11 +18,13 @@ except ImportError:
|
||||
import unittest # noqa
|
||||
|
||||
import cassandra
|
||||
from cassandra.cqltypes import IntegerType, AsciiType, TupleType
|
||||
from cassandra.metadata import (Murmur3Token, MD5Token,
|
||||
BytesToken, ReplicationStrategy,
|
||||
NetworkTopologyStrategy, SimpleStrategy,
|
||||
LocalStrategy, NoMurmur3, protect_name,
|
||||
protect_names, protect_value, is_valid_name)
|
||||
protect_names, protect_value, is_valid_name,
|
||||
UserType)
|
||||
from cassandra.policies import SimpleConvictionPolicy
|
||||
from cassandra.pool import Host
|
||||
|
||||
@@ -248,3 +250,21 @@ class TestTokens(unittest.TestCase):
|
||||
self.fail('Tokens for ByteOrderedPartitioner should be only strings')
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
|
||||
class TestUserTypes(unittest.TestCase):
|
||||
|
||||
def test_as_cql_query(self):
|
||||
field_types = [IntegerType, AsciiType, TupleType.apply_parameters([IntegerType, AsciiType])]
|
||||
udt = UserType("ks1", "mytype", ["a", "b", "c"], field_types)
|
||||
self.assertEqual("CREATE TYPE ks1.mytype (a varint, b ascii, c tuple<varint, ascii>)", udt.as_cql_query(formatted=False))
|
||||
|
||||
self.assertEqual("""CREATE TYPE ks1.mytype (
|
||||
a varint,
|
||||
b ascii,
|
||||
c tuple<varint, ascii>
|
||||
)""", udt.as_cql_query(formatted=True))
|
||||
|
||||
def test_as_cql_query_name_escaping(self):
|
||||
udt = UserType("MyKeyspace", "MyType", ["AbA", "keyspace"], [AsciiType, AsciiType])
|
||||
self.assertEqual('CREATE TYPE "MyKeyspace"."MyType" ("AbA" ascii, "keyspace" ascii)', udt.as_cql_query(formatted=False))
|
||||
|
||||
Reference in New Issue
Block a user