WIP: 1st working draft
This commit is contained in:
@@ -1695,6 +1695,7 @@ class ControlConnection(object):
|
||||
_SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies"
|
||||
_SELECT_COLUMNS = "SELECT * FROM system.schema_columns"
|
||||
_SELECT_USERTYPES = "SELECT * FROM system.schema_usertypes"
|
||||
_SELECT_TRIGGERS = "SELECT * FROM system.schema_triggers"
|
||||
|
||||
_SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers"
|
||||
_SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner, schema_version FROM system.local WHERE key='local'"
|
||||
@@ -1895,13 +1896,15 @@ class ControlConnection(object):
|
||||
where_clause = " WHERE keyspace_name = '%s' AND columnfamily_name = '%s'" % (keyspace, table)
|
||||
cf_query = QueryMessage(query=self._SELECT_COLUMN_FAMILIES + where_clause, consistency_level=cl)
|
||||
col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl)
|
||||
cf_result, col_result = connection.wait_for_responses(
|
||||
cf_query, col_query)
|
||||
triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl)
|
||||
cf_result, col_result, triggers_result = connection.wait_for_responses(
|
||||
cf_query, col_query, triggers_query)
|
||||
|
||||
log.debug("[control connection] Fetched table info for %s.%s, rebuilding metadata", keyspace, table)
|
||||
cf_result = dict_factory(*cf_result.results) if cf_result else {}
|
||||
col_result = dict_factory(*col_result.results) if col_result else {}
|
||||
self._cluster.metadata.table_changed(keyspace, table, cf_result, col_result)
|
||||
triggers_result = dict_factory(*triggers_result.results) if triggers_result else {}
|
||||
self._cluster.metadata.table_changed(keyspace, table, cf_result, col_result, triggers_result)
|
||||
elif usertype:
|
||||
# user defined types within this keyspace changed
|
||||
where_clause = " WHERE keyspace_name = '%s' AND type_name = '%s'" % (keyspace, usertype)
|
||||
@@ -1924,11 +1927,14 @@ class ControlConnection(object):
|
||||
QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl),
|
||||
QueryMessage(query=self._SELECT_COLUMN_FAMILIES, consistency_level=cl),
|
||||
QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
|
||||
QueryMessage(query=self._SELECT_USERTYPES, consistency_level=cl)
|
||||
QueryMessage(query=self._SELECT_USERTYPES, consistency_level=cl),
|
||||
QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl)
|
||||
]
|
||||
|
||||
responses = connection.wait_for_responses(*queries, fail_on_error=False)
|
||||
(ks_success, ks_result), (cf_success, cf_result), (col_success, col_result), (types_success, types_result) = responses
|
||||
(ks_success, ks_result), (cf_success, cf_result), \
|
||||
(col_success, col_result), (types_success, types_result), \
|
||||
(trigger_success, triggers_result) = responses
|
||||
|
||||
if ks_success:
|
||||
ks_result = dict_factory(*ks_result.results)
|
||||
@@ -1945,6 +1951,11 @@ class ControlConnection(object):
|
||||
else:
|
||||
raise col_result
|
||||
|
||||
if trigger_success:
|
||||
triggers_result = dict_factory(*triggers_result.results)
|
||||
else:
|
||||
raise triggers_result
|
||||
|
||||
# if we're connected to Cassandra < 2.1, the usertypes table will not exist
|
||||
if types_success:
|
||||
types_result = dict_factory(*types_result.results) if types_result.results else {}
|
||||
@@ -1956,7 +1967,7 @@ class ControlConnection(object):
|
||||
raise types_result
|
||||
|
||||
log.debug("[control connection] Fetched schema, rebuilding metadata")
|
||||
self._cluster.metadata.rebuild_schema(ks_result, types_result, cf_result, col_result)
|
||||
self._cluster.metadata.rebuild_schema(ks_result, types_result, cf_result, col_result, triggers_result)
|
||||
|
||||
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
|
||||
try:
|
||||
|
||||
@@ -88,7 +88,7 @@ class Metadata(object):
|
||||
"""
|
||||
return "\n".join(ks.export_as_string() for ks in self.keyspaces.values())
|
||||
|
||||
def rebuild_schema(self, ks_results, type_results, cf_results, col_results):
|
||||
def rebuild_schema(self, ks_results, type_results, cf_results, col_results, triggers_result):
|
||||
"""
|
||||
Rebuild the view of the current schema from a fresh set of rows from
|
||||
the system schema tables.
|
||||
@@ -98,6 +98,7 @@ class Metadata(object):
|
||||
cf_def_rows = defaultdict(list)
|
||||
col_def_rows = defaultdict(lambda: defaultdict(list))
|
||||
usertype_rows = defaultdict(list)
|
||||
trigger_rows = defaultdict(lambda: defaultdict(list))
|
||||
|
||||
for row in cf_results:
|
||||
cf_def_rows[row["keyspace_name"]].append(row)
|
||||
@@ -110,12 +111,18 @@ class Metadata(object):
|
||||
for row in type_results:
|
||||
usertype_rows[row["keyspace_name"]].append(row)
|
||||
|
||||
for row in triggers_result:
|
||||
ksname = row["keyspace_name"]
|
||||
cfname = row["columnfamily_name"]
|
||||
trigger_rows[ksname][cfname].append(row)
|
||||
|
||||
current_keyspaces = set()
|
||||
for row in ks_results:
|
||||
keyspace_meta = self._build_keyspace_metadata(row)
|
||||
for table_row in cf_def_rows.get(keyspace_meta.name, []):
|
||||
table_meta = self._build_table_metadata(
|
||||
keyspace_meta, table_row, col_def_rows[keyspace_meta.name])
|
||||
keyspace_meta, table_row, col_def_rows[keyspace_meta.name],
|
||||
trigger_rows.get(keyspace_meta.name, {}))
|
||||
keyspace_meta.tables[table_meta.name] = table_meta
|
||||
|
||||
for usertype_row in usertype_rows.get(keyspace_meta.name, []):
|
||||
@@ -163,7 +170,7 @@ class Metadata(object):
|
||||
# the type was deleted
|
||||
self.keyspaces[keyspace].user_types.pop(name, None)
|
||||
|
||||
def table_changed(self, keyspace, table, cf_results, col_results):
|
||||
def table_changed(self, keyspace, table, cf_results, col_results, triggers_result):
|
||||
try:
|
||||
keyspace_meta = self.keyspaces[keyspace]
|
||||
except KeyError:
|
||||
@@ -178,7 +185,8 @@ class Metadata(object):
|
||||
else:
|
||||
assert len(cf_results) == 1
|
||||
keyspace_meta.tables[table] = self._build_table_metadata(
|
||||
keyspace_meta, cf_results[0], {table: col_results})
|
||||
keyspace_meta, cf_results[0], {table: col_results},
|
||||
{table: triggers_result})
|
||||
|
||||
def _keyspace_added(self, ksname):
|
||||
if self.token_map:
|
||||
@@ -204,7 +212,7 @@ class Metadata(object):
|
||||
return UserType(usertype_row['keyspace_name'], usertype_row['type_name'],
|
||||
usertype_row['field_names'], type_classes)
|
||||
|
||||
def _build_table_metadata(self, keyspace_metadata, row, col_rows):
|
||||
def _build_table_metadata(self, keyspace_metadata, row, col_rows, trigger_rows):
|
||||
cfname = row["columnfamily_name"]
|
||||
|
||||
comparator = types.lookup_casstype(row["comparator"])
|
||||
@@ -294,6 +302,12 @@ class Metadata(object):
|
||||
column_meta = self._build_column_metadata(table_meta, col_row)
|
||||
table_meta.columns[column_meta.name] = column_meta
|
||||
|
||||
if trigger_rows:
|
||||
log.debug("%s" % trigger_rows)
|
||||
for trigger_row in trigger_rows[cfname]:
|
||||
trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
|
||||
table_meta.triggers[trigger_meta.name] = trigger_meta
|
||||
|
||||
table_meta.options = self._build_table_options(row)
|
||||
table_meta.is_compact_storage = is_compact
|
||||
return table_meta
|
||||
@@ -330,6 +344,12 @@ class Metadata(object):
|
||||
else:
|
||||
return None
|
||||
|
||||
def _build_trigger_metadata(self, table_metadata, row):
|
||||
name = row["trigger_name"]
|
||||
options = row["trigger_options"]
|
||||
trigger_meta = TriggerMetadata(table_metadata, name, options)
|
||||
return trigger_meta
|
||||
|
||||
def rebuild_token_map(self, partitioner, token_map):
|
||||
"""
|
||||
Rebuild our view of the topology from fresh rows from the
|
||||
@@ -788,7 +808,12 @@ class TableMetadata(object):
|
||||
"max_compaction_threshold": "max_threshold",
|
||||
"compaction_strategy_class": "class"}
|
||||
|
||||
def __init__(self, keyspace_metadata, name, partition_key=None, clustering_key=None, columns=None, options=None):
|
||||
triggers = None
|
||||
"""
|
||||
A dict mapping trigger names to :class:`.TriggerMetadata` instances.
|
||||
"""
|
||||
|
||||
def __init__(self, keyspace_metadata, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None):
|
||||
self.keyspace = keyspace_metadata
|
||||
self.name = name
|
||||
self.partition_key = [] if partition_key is None else partition_key
|
||||
@@ -796,6 +821,7 @@ class TableMetadata(object):
|
||||
self.columns = OrderedDict() if columns is None else columns
|
||||
self.options = options
|
||||
self.comparator = None
|
||||
self.triggers = OrderedDict() if triggers is None else triggers
|
||||
|
||||
def export_as_string(self):
|
||||
"""
|
||||
@@ -1211,3 +1237,25 @@ class BytesToken(Token):
|
||||
"Tokens for ByteOrderedPartitioner should be strings (got %s)"
|
||||
% (type(token_string),))
|
||||
self.value = token_string
|
||||
|
||||
|
||||
class TriggerMetadata(object):
|
||||
"""
|
||||
A representation of a trigger for a table.
|
||||
"""
|
||||
|
||||
table = None
|
||||
""" The :class:`.TableMetadata` this trigger belongs to. """
|
||||
|
||||
name = None
|
||||
""" The string name of this trigger. """
|
||||
|
||||
options = None
|
||||
"""
|
||||
A dict mapping table option names to their specific settings for this
|
||||
table.
|
||||
"""
|
||||
def __init__(self, table_metadata, trigger_name, options=None):
|
||||
self.table = table_metadata
|
||||
self.name = trigger_name
|
||||
self.options = options
|
||||
|
||||
Reference in New Issue
Block a user