Added some additional logging around connection setup and schema management.
This commit is contained in:
@@ -45,10 +45,16 @@ def default():
|
|||||||
(except for row_factory)
|
(except for row_factory)
|
||||||
"""
|
"""
|
||||||
global cluster, session
|
global cluster, session
|
||||||
|
|
||||||
|
if session:
|
||||||
|
log.warn("configuring new connection for cqlengine when one was already set")
|
||||||
|
|
||||||
cluster = Cluster()
|
cluster = Cluster()
|
||||||
session = cluster.connect()
|
session = cluster.connect()
|
||||||
session.row_factory = dict_factory
|
session.row_factory = dict_factory
|
||||||
|
|
||||||
|
log.debug("cqlengine connection initialized with default session to localhost")
|
||||||
|
|
||||||
|
|
||||||
def set_session(s):
|
def set_session(s):
|
||||||
"""
|
"""
|
||||||
@@ -58,11 +64,17 @@ def set_session(s):
|
|||||||
This may be relaxed in the future
|
This may be relaxed in the future
|
||||||
"""
|
"""
|
||||||
global cluster, session
|
global cluster, session
|
||||||
|
|
||||||
|
if session:
|
||||||
|
log.warn("configuring new connection for cqlengine when one was already set")
|
||||||
|
|
||||||
if s.row_factory is not dict_factory:
|
if s.row_factory is not dict_factory:
|
||||||
raise CQLEngineException("Failed to initialize: 'Session.row_factory' must be 'dict_factory'.")
|
raise CQLEngineException("Failed to initialize: 'Session.row_factory' must be 'dict_factory'.")
|
||||||
session = s
|
session = s
|
||||||
cluster = s.cluster
|
cluster = s.cluster
|
||||||
|
|
||||||
|
log.debug("cqlengine connection initialized with %s", s)
|
||||||
|
|
||||||
|
|
||||||
def setup(
|
def setup(
|
||||||
hosts,
|
hosts,
|
||||||
@@ -104,8 +116,10 @@ def setup(
|
|||||||
cluster = Cluster(hosts, **kwargs)
|
cluster = Cluster(hosts, **kwargs)
|
||||||
try:
|
try:
|
||||||
session = cluster.connect()
|
session = cluster.connect()
|
||||||
|
log.debug("cqlengine connection initialized with internally created session")
|
||||||
except NoHostAvailable:
|
except NoHostAvailable:
|
||||||
if retry_connect:
|
if retry_connect:
|
||||||
|
log.warn("connect failed, setting up for re-attempt on first use")
|
||||||
kwargs['default_keyspace'] = default_keyspace
|
kwargs['default_keyspace'] = default_keyspace
|
||||||
kwargs['consistency'] = consistency
|
kwargs['consistency'] = consistency
|
||||||
kwargs['lazy_connect'] = False
|
kwargs['lazy_connect'] = False
|
||||||
@@ -157,6 +171,7 @@ def get_cluster():
|
|||||||
def handle_lazy_connect():
|
def handle_lazy_connect():
|
||||||
global lazy_connect_args
|
global lazy_connect_args
|
||||||
if lazy_connect_args:
|
if lazy_connect_args:
|
||||||
|
log.debug("lazy connect")
|
||||||
hosts, kwargs = lazy_connect_args
|
hosts, kwargs = lazy_connect_args
|
||||||
lazy_connect_args = None
|
lazy_connect_args = None
|
||||||
setup(hosts, **kwargs)
|
setup(hosts, **kwargs)
|
||||||
|
|||||||
@@ -12,6 +12,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
class CQLEngineException(Exception):
|
class CQLEngineException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
@@ -130,12 +130,14 @@ def _create_keyspace(name, durable_writes, strategy_class, strategy_options):
|
|||||||
else:
|
else:
|
||||||
log.info("Not creating keyspace %s because it already exists", name)
|
log.info("Not creating keyspace %s because it already exists", name)
|
||||||
|
|
||||||
|
|
||||||
def delete_keyspace(name):
|
def delete_keyspace(name):
|
||||||
msg = "Deprecated. Use drop_keyspace instead"
|
msg = "Deprecated. Use drop_keyspace instead"
|
||||||
warnings.warn(msg, DeprecationWarning)
|
warnings.warn(msg, DeprecationWarning)
|
||||||
log.warn(msg)
|
log.warn(msg)
|
||||||
drop_keyspace(name)
|
drop_keyspace(name)
|
||||||
|
|
||||||
|
|
||||||
def drop_keyspace(name):
|
def drop_keyspace(name):
|
||||||
"""
|
"""
|
||||||
Drops a keyspace, if it exists.
|
Drops a keyspace, if it exists.
|
||||||
@@ -156,6 +158,8 @@ def sync_table(model):
|
|||||||
"""
|
"""
|
||||||
Inspects the model and creates / updates the corresponding table and columns.
|
Inspects the model and creates / updates the corresponding table and columns.
|
||||||
|
|
||||||
|
This function can only add fields that are not part of the primary key.
|
||||||
|
|
||||||
Note that the attributes removed from the model are not deleted on the database.
|
Note that the attributes removed from the model are not deleted on the database.
|
||||||
They become effectively ignored by (will not show up on) the model.
|
They become effectively ignored by (will not show up on) the model.
|
||||||
|
|
||||||
@@ -184,6 +188,7 @@ def sync_table(model):
|
|||||||
|
|
||||||
# check for an existing column family
|
# check for an existing column family
|
||||||
if raw_cf_name not in tables:
|
if raw_cf_name not in tables:
|
||||||
|
log.debug("sync_table creating new table %s", cf_name)
|
||||||
qs = get_create_table(model)
|
qs = get_create_table(model)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -194,20 +199,26 @@ def sync_table(model):
|
|||||||
if "Cannot add already existing column family" not in unicode(ex):
|
if "Cannot add already existing column family" not in unicode(ex):
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
|
log.debug("sync_table checking existing table %s", cf_name)
|
||||||
# see if we're missing any columns
|
# see if we're missing any columns
|
||||||
fields = get_fields(model)
|
fields = get_fields(model)
|
||||||
field_names = [x.name for x in fields]
|
field_names = [x.name for x in fields]
|
||||||
|
model_fields = set()
|
||||||
for name, col in model._columns.items():
|
for name, col in model._columns.items():
|
||||||
if col.primary_key or col.partition_key:
|
if col.primary_key or col.partition_key:
|
||||||
continue # we can't mess with the PK
|
continue # we can't mess with the PK
|
||||||
|
model_fields.add(name)
|
||||||
if col.db_field_name in field_names:
|
if col.db_field_name in field_names:
|
||||||
continue # skip columns already defined
|
continue # skip columns already defined
|
||||||
|
|
||||||
# add missing column using the column def
|
# add missing column using the column def
|
||||||
query = "ALTER TABLE {} add {}".format(cf_name, col.get_column_def())
|
query = "ALTER TABLE {} add {}".format(cf_name, col.get_column_def())
|
||||||
log.debug(query)
|
|
||||||
execute(query)
|
execute(query)
|
||||||
|
|
||||||
|
db_fields_not_in_model = model_fields.symmetric_difference(field_names)
|
||||||
|
if db_fields_not_in_model:
|
||||||
|
log.info("Table %s has fields not referenced by model: %s", cf_name, db_fields_not_in_model)
|
||||||
|
|
||||||
update_compaction(model)
|
update_compaction(model)
|
||||||
|
|
||||||
table = cluster.metadata.keyspaces[ks_name].tables[raw_cf_name]
|
table = cluster.metadata.keyspaces[ks_name].tables[raw_cf_name]
|
||||||
@@ -391,7 +402,6 @@ def update_compaction(model):
|
|||||||
options = json.dumps(options).replace('"', "'")
|
options = json.dumps(options).replace('"', "'")
|
||||||
cf_name = model.column_family_name()
|
cf_name = model.column_family_name()
|
||||||
query = "ALTER TABLE {} with compaction = {}".format(cf_name, options)
|
query = "ALTER TABLE {} with compaction = {}".format(cf_name, options)
|
||||||
log.debug(query)
|
|
||||||
execute(query)
|
execute(query)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|||||||
@@ -42,7 +42,6 @@ class hybrid_classmethod(object):
|
|||||||
Allows a method to behave as both a class method and
|
Allows a method to behave as both a class method and
|
||||||
normal instance method depending on how it's called
|
normal instance method depending on how it's called
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, clsmethod, instmethod):
|
def __init__(self, clsmethod, instmethod):
|
||||||
self.clsmethod = clsmethod
|
self.clsmethod = clsmethod
|
||||||
self.instmethod = instmethod
|
self.instmethod = instmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user