Use util fns instead of inheritance for cql version
This commit is contained in:
@@ -24,29 +24,26 @@ if not os.path.exists(path):
|
||||
os.mkdir(path)
|
||||
|
||||
|
||||
class BaseTestCase(unittest.TestCase):
|
||||
def _get_cass_and_cql_version(self):
|
||||
"""
|
||||
Probe system.local table to determine Cassandra and CQL version.
|
||||
"""
|
||||
c = Cluster()
|
||||
s = c.connect()
|
||||
s.set_keyspace('system')
|
||||
row = s.execute('SELECT cql_version, release_version FROM local')[0]
|
||||
def get_server_versions():
|
||||
"""
|
||||
Probe system.local table to determine Cassandra and CQL version.
|
||||
Returns a tuple of (cassandra_version, cql_version).
|
||||
"""
|
||||
c = Cluster()
|
||||
s = c.connect()
|
||||
s.set_keyspace('system')
|
||||
row = s.execute('SELECT cql_version, release_version FROM local')[0]
|
||||
|
||||
cass_version = self._get_version_as_tuple(row.release_version)
|
||||
cql_version = self._get_version_as_tuple(row.cql_version)
|
||||
cass_version = _tuple_version(row.release_version)
|
||||
cql_version = _tuple_version(row.cql_version)
|
||||
|
||||
c.shutdown()
|
||||
c.shutdown()
|
||||
|
||||
result = {'cass_version': cass_version, 'cql_version': cql_version}
|
||||
return result
|
||||
return (cass_version, cql_version)
|
||||
|
||||
def _get_version_as_tuple(self, version):
|
||||
version = version.split('.')
|
||||
version = [int(p) for p in version]
|
||||
version = tuple(version)
|
||||
return version
|
||||
|
||||
def _tuple_version(version_string):
|
||||
return tuple([int(p) for p in version_string.split('.')])
|
||||
|
||||
|
||||
def get_cluster():
|
||||
|
||||
@@ -10,15 +10,15 @@ from uuid import uuid1, uuid4
|
||||
from blist import sortedset
|
||||
|
||||
from cassandra import InvalidRequest
|
||||
from cassandra.cluster import Cluster, NoHostAvailable
|
||||
from cassandra.cluster import Cluster
|
||||
|
||||
from tests.integration import BaseTestCase
|
||||
from tests.integration import get_server_versions
|
||||
|
||||
|
||||
class TypeTests(BaseTestCase):
|
||||
class TypeTests(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TypeTests, self).setUp()
|
||||
self._versions = self._get_cass_and_cql_version()
|
||||
self._cass_version, self._cql_version = get_server_versions()
|
||||
|
||||
def test_blob_type_as_string(self):
|
||||
c = Cluster()
|
||||
@@ -44,7 +44,7 @@ class TypeTests(BaseTestCase):
|
||||
|
||||
query = 'INSERT INTO mytable (a, b) VALUES (%s, %s)'
|
||||
|
||||
if self._versions['cql_version'] >= (3, 1, 0):
|
||||
if self._cql_version >= (3, 1, 0):
|
||||
# Blob values can't be specified using string notation in CQL 3.1.0 and
|
||||
# above which is used by default in Cassandra 2.0.
|
||||
msg = r'.*Invalid STRING constant \(.*?\) for b of type blob.*'
|
||||
@@ -213,9 +213,3 @@ class TypeTests(BaseTestCase):
|
||||
|
||||
for expected, actual in zip(expected_vals, results[0]):
|
||||
self.assertEquals(expected, actual)
|
||||
|
||||
def _get_version_as_tuple(self, version):
|
||||
version = version.split('.')
|
||||
version = [int(p) for p in version]
|
||||
version = tuple(version)
|
||||
return version
|
||||
|
||||
Reference in New Issue
Block a user