From aa96cdc89eca65598058679946086a1b45132905 Mon Sep 17 00:00:00 2001 From: Mikhail Stepura Date: Wed, 27 Aug 2014 14:29:19 -0700 Subject: [PATCH] WIP: 1st working draft --- cassandra/cluster.py | 23 ++++++++++++----- cassandra/metadata.py | 60 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 71 insertions(+), 12 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index fa5defd1..4771dc2a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1695,6 +1695,7 @@ class ControlConnection(object): _SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies" _SELECT_COLUMNS = "SELECT * FROM system.schema_columns" _SELECT_USERTYPES = "SELECT * FROM system.schema_usertypes" + _SELECT_TRIGGERS = "SELECT * FROM system.schema_triggers" _SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers" _SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner, schema_version FROM system.local WHERE key='local'" @@ -1895,13 +1896,15 @@ class ControlConnection(object): where_clause = " WHERE keyspace_name = '%s' AND columnfamily_name = '%s'" % (keyspace, table) cf_query = QueryMessage(query=self._SELECT_COLUMN_FAMILIES + where_clause, consistency_level=cl) col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl) - cf_result, col_result = connection.wait_for_responses( - cf_query, col_query) + triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl) + cf_result, col_result, triggers_result = connection.wait_for_responses( + cf_query, col_query, triggers_query) log.debug("[control connection] Fetched table info for %s.%s, rebuilding metadata", keyspace, table) cf_result = dict_factory(*cf_result.results) if cf_result else {} col_result = dict_factory(*col_result.results) if col_result else {} - self._cluster.metadata.table_changed(keyspace, table, cf_result, col_result) + triggers_result = dict_factory(*triggers_result.results) if triggers_result else {} + self._cluster.metadata.table_changed(keyspace, table, cf_result, col_result, triggers_result) elif usertype: # user defined types within this keyspace changed where_clause = " WHERE keyspace_name = '%s' AND type_name = '%s'" % (keyspace, usertype) @@ -1924,11 +1927,14 @@ class ControlConnection(object): QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl), QueryMessage(query=self._SELECT_COLUMN_FAMILIES, consistency_level=cl), QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), - QueryMessage(query=self._SELECT_USERTYPES, consistency_level=cl) + QueryMessage(query=self._SELECT_USERTYPES, consistency_level=cl), + QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl) ] responses = connection.wait_for_responses(*queries, fail_on_error=False) - (ks_success, ks_result), (cf_success, cf_result), (col_success, col_result), (types_success, types_result) = responses + (ks_success, ks_result), (cf_success, cf_result), \ + (col_success, col_result), (types_success, types_result), \ + (trigger_success, triggers_result) = responses if ks_success: ks_result = dict_factory(*ks_result.results) @@ -1945,6 +1951,11 @@ class ControlConnection(object): else: raise col_result + if trigger_success: + triggers_result = dict_factory(*triggers_result.results) + else: + raise triggers_result + # if we're connected to Cassandra < 2.1, the usertypes table will not exist if types_success: types_result = dict_factory(*types_result.results) if types_result.results else {} @@ -1956,7 +1967,7 @@ class ControlConnection(object): raise types_result log.debug("[control connection] Fetched schema, rebuilding metadata") - self._cluster.metadata.rebuild_schema(ks_result, types_result, cf_result, col_result) + self._cluster.metadata.rebuild_schema(ks_result, types_result, cf_result, col_result, triggers_result) def refresh_node_list_and_token_map(self, force_token_rebuild=False): try: diff --git a/cassandra/metadata.py b/cassandra/metadata.py index cd793ea1..96fd9901 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -88,7 +88,7 @@ class Metadata(object): """ return "\n".join(ks.export_as_string() for ks in self.keyspaces.values()) - def rebuild_schema(self, ks_results, type_results, cf_results, col_results): + def rebuild_schema(self, ks_results, type_results, cf_results, col_results, triggers_result): """ Rebuild the view of the current schema from a fresh set of rows from the system schema tables. @@ -98,6 +98,7 @@ class Metadata(object): cf_def_rows = defaultdict(list) col_def_rows = defaultdict(lambda: defaultdict(list)) usertype_rows = defaultdict(list) + trigger_rows = defaultdict(lambda: defaultdict(list)) for row in cf_results: cf_def_rows[row["keyspace_name"]].append(row) @@ -110,12 +111,18 @@ class Metadata(object): for row in type_results: usertype_rows[row["keyspace_name"]].append(row) + for row in triggers_result: + ksname = row["keyspace_name"] + cfname = row["columnfamily_name"] + trigger_rows[ksname][cfname].append(row) + current_keyspaces = set() for row in ks_results: keyspace_meta = self._build_keyspace_metadata(row) for table_row in cf_def_rows.get(keyspace_meta.name, []): table_meta = self._build_table_metadata( - keyspace_meta, table_row, col_def_rows[keyspace_meta.name]) + keyspace_meta, table_row, col_def_rows[keyspace_meta.name], + trigger_rows.get(keyspace_meta.name, {})) keyspace_meta.tables[table_meta.name] = table_meta for usertype_row in usertype_rows.get(keyspace_meta.name, []): @@ -163,7 +170,7 @@ class Metadata(object): # the type was deleted self.keyspaces[keyspace].user_types.pop(name, None) - def table_changed(self, keyspace, table, cf_results, col_results): + def table_changed(self, keyspace, table, cf_results, col_results, triggers_result): try: keyspace_meta = self.keyspaces[keyspace] except KeyError: @@ -178,7 +185,8 @@ class Metadata(object): else: assert len(cf_results) == 1 keyspace_meta.tables[table] = self._build_table_metadata( - keyspace_meta, cf_results[0], {table: col_results}) + keyspace_meta, cf_results[0], {table: col_results}, + {table: triggers_result}) def _keyspace_added(self, ksname): if self.token_map: @@ -204,7 +212,7 @@ class Metadata(object): return UserType(usertype_row['keyspace_name'], usertype_row['type_name'], usertype_row['field_names'], type_classes) - def _build_table_metadata(self, keyspace_metadata, row, col_rows): + def _build_table_metadata(self, keyspace_metadata, row, col_rows, trigger_rows): cfname = row["columnfamily_name"] comparator = types.lookup_casstype(row["comparator"]) @@ -294,6 +302,12 @@ class Metadata(object): column_meta = self._build_column_metadata(table_meta, col_row) table_meta.columns[column_meta.name] = column_meta + if trigger_rows: + log.debug("%s" % trigger_rows) + for trigger_row in trigger_rows[cfname]: + trigger_meta = self._build_trigger_metadata(table_meta, trigger_row) + table_meta.triggers[trigger_meta.name] = trigger_meta + table_meta.options = self._build_table_options(row) table_meta.is_compact_storage = is_compact return table_meta @@ -330,6 +344,12 @@ class Metadata(object): else: return None + def _build_trigger_metadata(self, table_metadata, row): + name = row["trigger_name"] + options = row["trigger_options"] + trigger_meta = TriggerMetadata(table_metadata, name, options) + return trigger_meta + def rebuild_token_map(self, partitioner, token_map): """ Rebuild our view of the topology from fresh rows from the @@ -788,7 +808,12 @@ class TableMetadata(object): "max_compaction_threshold": "max_threshold", "compaction_strategy_class": "class"} - def __init__(self, keyspace_metadata, name, partition_key=None, clustering_key=None, columns=None, options=None): + triggers = None + """ + A dict mapping trigger names to :class:`.TriggerMetadata` instances. + """ + + def __init__(self, keyspace_metadata, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None): self.keyspace = keyspace_metadata self.name = name self.partition_key = [] if partition_key is None else partition_key @@ -796,6 +821,7 @@ class TableMetadata(object): self.columns = OrderedDict() if columns is None else columns self.options = options self.comparator = None + self.triggers = OrderedDict() if triggers is None else triggers def export_as_string(self): """ @@ -1211,3 +1237,25 @@ class BytesToken(Token): "Tokens for ByteOrderedPartitioner should be strings (got %s)" % (type(token_string),)) self.value = token_string + + +class TriggerMetadata(object): + """ + A representation of a trigger for a table. + """ + + table = None + """ The :class:`.TableMetadata` this trigger belongs to. """ + + name = None + """ The string name of this trigger. """ + + options = None + """ + A dict mapping table option names to their specific settings for this + table. + """ + def __init__(self, table_metadata, trigger_name, options=None): + self.table = table_metadata + self.name = trigger_name + self.options = options