Merge pull request #555 from datastax/301
PYTHON-301 - release_version in host metadata
This commit is contained in:
		| @@ -2068,11 +2068,10 @@ class ControlConnection(object): | ||||
|     """ | ||||
|  | ||||
|     _SELECT_PEERS = "SELECT * FROM system.peers" | ||||
|     _SELECT_PEERS_NO_TOKENS = "SELECT peer, data_center, rack, rpc_address, schema_version FROM system.peers" | ||||
|     _SELECT_PEERS_NO_TOKENS = "SELECT peer, data_center, rack, rpc_address, release_version, schema_version FROM system.peers" | ||||
|     _SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'" | ||||
|     _SELECT_LOCAL_NO_TOKENS = "SELECT cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'" | ||||
|  | ||||
|  | ||||
|     _SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers" | ||||
|     _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'" | ||||
|  | ||||
| @@ -2351,14 +2350,13 @@ class ControlConnection(object): | ||||
|                 self._update_location_info(host, datacenter, rack) | ||||
|                 host.listen_address = local_row.get("listen_address") | ||||
|                 host.broadcast_address = local_row.get("broadcast_address") | ||||
|                 host.release_version = local_row.get("release_version") | ||||
|  | ||||
|             partitioner = local_row.get("partitioner") | ||||
|             tokens = local_row.get("tokens") | ||||
|             if partitioner and tokens: | ||||
|                 token_map[host] = tokens | ||||
|  | ||||
|             connection.server_version = local_row['release_version'] | ||||
|  | ||||
|         # Check metadata.partitioner to see if we haven't built anything yet. If | ||||
|         # every node in the cluster was in the contact points, we won't discover | ||||
|         # any new nodes, so we need this additional check.  (See PYTHON-90) | ||||
| @@ -2385,6 +2383,7 @@ class ControlConnection(object): | ||||
|                 should_rebuild_token_map |= self._update_location_info(host, datacenter, rack) | ||||
|  | ||||
|             host.broadcast_address = row.get("peer") | ||||
|             host.release_version = row.get("release_version") | ||||
|  | ||||
|             if partitioner and tokens: | ||||
|                 token_map[host] = tokens | ||||
|   | ||||
| @@ -232,8 +232,6 @@ class Connection(object): | ||||
|     is_control_connection = False | ||||
|     signaled_error = False  # used for flagging at the pool level | ||||
|  | ||||
|     _server_version = None | ||||
|  | ||||
|     _iobuf = None | ||||
|     _current_frame = None | ||||
|  | ||||
| @@ -829,18 +827,6 @@ class Connection(object): | ||||
|     def reset_idle(self): | ||||
|         self.msg_received = False | ||||
|  | ||||
|     @property | ||||
|     def server_version(self): | ||||
|         if self._server_version is None: | ||||
|             query_message = QueryMessage(query="SELECT release_version FROM system.local", consistency_level=ConsistencyLevel.ONE) | ||||
|             message = self.wait_for_response(query_message) | ||||
|             self._server_version = message.results[1][0][0]  # (col names, rows)[rows][first row][only item] | ||||
|         return self._server_version | ||||
|  | ||||
|     @server_version.setter | ||||
|     def server_version(self, version): | ||||
|         self._server_version = version | ||||
|  | ||||
|     def __str__(self): | ||||
|         status = "" | ||||
|         if self.is_defunct: | ||||
|   | ||||
| @@ -117,13 +117,15 @@ class Metadata(object): | ||||
|  | ||||
|     def refresh(self, connection, timeout, target_type=None, change_type=None, **kwargs): | ||||
|  | ||||
|         server_version = self.get_host(connection.host).release_version | ||||
|         parser = get_schema_parser(connection, server_version, timeout) | ||||
|  | ||||
|         if not target_type: | ||||
|             self._rebuild_all(connection, timeout) | ||||
|             self._rebuild_all(parser) | ||||
|             return | ||||
|  | ||||
|         tt_lower = target_type.lower() | ||||
|         try: | ||||
|             parser = get_schema_parser(connection, timeout) | ||||
|             parse_method = getattr(parser, 'get_' + tt_lower) | ||||
|             meta = parse_method(self.keyspaces, **kwargs) | ||||
|             if meta: | ||||
| @@ -135,12 +137,7 @@ class Metadata(object): | ||||
|         except AttributeError: | ||||
|             raise ValueError("Unknown schema target_type: '%s'" % target_type) | ||||
|  | ||||
|     def _rebuild_all(self, connection, timeout): | ||||
|         """ | ||||
|         For internal use only. | ||||
|         """ | ||||
|         parser = get_schema_parser(connection, timeout) | ||||
|  | ||||
|     def _rebuild_all(self, parser): | ||||
|         current_keyspaces = set() | ||||
|         for keyspace_meta in parser.get_all_keyspaces(): | ||||
|             current_keyspaces.add(keyspace_meta.name) | ||||
| @@ -2455,8 +2452,7 @@ class MaterializedViewMetadata(object): | ||||
|         return self.as_cql_query(formatted=True) + ";" | ||||
|  | ||||
|  | ||||
| def get_schema_parser(connection, timeout): | ||||
|     server_version = connection.server_version | ||||
| def get_schema_parser(connection, server_version, timeout): | ||||
|     if server_version.startswith('3'): | ||||
|         return SchemaParserV3(connection, timeout) | ||||
|     else: | ||||
|   | ||||
| @@ -78,6 +78,11 @@ class Host(object): | ||||
|     up or down. | ||||
|     """ | ||||
|  | ||||
|     release_version = None | ||||
|     """ | ||||
|     release_version as queried from the control connection system tables | ||||
|     """ | ||||
|  | ||||
|     _datacenter = None | ||||
|     _rack = None | ||||
|     _reconnection_handler = None | ||||
|   | ||||
| @@ -142,7 +142,8 @@ class SchemaMetadataTests(BasicSegregatedKeyspaceUnitTestCase): | ||||
|         self.assertEqual([], tablemeta.clustering_key) | ||||
|         self.assertEqual([u'a', u'b', u'c'], sorted(tablemeta.columns.keys())) | ||||
|  | ||||
|         parser = get_schema_parser(self.cluster.control_connection._connection, 1) | ||||
|         cc = self.cluster.control_connection._connection | ||||
|         parser = get_schema_parser(cc, str(CASS_SERVER_VERSION[0]), 1) | ||||
|  | ||||
|         for option in tablemeta.options: | ||||
|             self.assertIn(option, parser.recognized_table_options) | ||||
| @@ -1919,7 +1920,7 @@ class BadMetaTest(unittest.TestCase): | ||||
|         cls.session.execute("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) | ||||
|         cls.session.set_keyspace(cls.keyspace_name) | ||||
|         connection = cls.cluster.control_connection._connection | ||||
|         cls.parser_class = get_schema_parser(connection, timeout=20).__class__ | ||||
|         cls.parser_class = get_schema_parser(connection, str(CASS_SERVER_VERSION[0]), timeout=20).__class__ | ||||
|  | ||||
|     @classmethod | ||||
|     def teardown_class(cls): | ||||
|   | ||||
| @@ -380,9 +380,7 @@ class IndexTest(unittest.TestCase): | ||||
|         column_meta.table.name = 'table_name_here' | ||||
|         column_meta.table.keyspace_name = 'keyspace_name_here' | ||||
|         column_meta.table.columns = {column_meta.name: column_meta} | ||||
|         connection = Mock() | ||||
|         connection.server_version = '2.1.0' | ||||
|         parser = get_schema_parser(connection, 0.1) | ||||
|         parser = get_schema_parser(Mock(), '2.1.0', 0.1) | ||||
|  | ||||
|         row = {'index_name': 'index_name_here', 'index_type': 'index_type_here'} | ||||
|         index_meta = parser._build_index_metadata(column_meta, row) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Adam Holmberg
					Adam Holmberg