diff --git a/cassandra/metadata.py b/cassandra/metadata.py index a4f6eb36..2aa1fa6b 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -5,6 +5,7 @@ try: except ImportError: # Python <2.7 from cassandra.util import OrderedDict # NOQA from hashlib import md5 +from itertools import islice, cycle import json import logging import re @@ -394,25 +395,50 @@ class NetworkTopologyStrategy(ReplicationStrategy): for dc, rf in self.dc_replication_factors.items() if rf > 0) dcs = dict((h, h.datacenter) for h in set(token_to_host_owner.values())) + # build a map of DCs to lists of indexes into `ring` for tokens that + # belong to that DC + dc_to_token_offset = defaultdict(list) + for i, token in enumerate(ring): + host = token_to_host_owner[token] + dc_to_token_offset[dcs[host]].append(i) + + # A map of DCs to an index into the dc_to_token_offset value for that dc. + # This is how we keep track of advancing around the ring for each DC. + dc_to_current_index = defaultdict(int) + for i in ring_len_range: remaining = dc_rf_map.copy() - for j in ring_len_range: - token = ring[(i + j) % ring_len] - host = token_to_host_owner[token] - dc = dcs[host] - if not dc in remaining: - # we already have all replicas for this DC + replicas = replica_map[ring[i]] + + # go through each DC and find the replicas in that DC + for dc in dc_to_token_offset.keys(): + if dc not in remaining: continue - if not host in replica_map[ring[i]]: - replica_map[ring[i]].append(host) + # advance our per-DC index until we're up to at least the + # current token in the ring + token_offsets = dc_to_token_offset[dc] + index = dc_to_current_index[dc] + num_tokens = len(token_offsets) + while index < num_tokens and token_offsets[index] < i: + index += 1 + dc_to_current_index[dc] = index - if remaining[dc] == 1: - del remaining[dc] - if not remaining: + # now add the next RF distinct token owners to the set of + # replicas for this DC + for token_offset in islice(cycle(token_offsets), index, index + num_tokens): + host = token_to_host_owner[ring[token_offset]] + if host in replicas: + continue + + replicas.append(host) + dc_remaining = remaining[dc] - 1 + if dc_remaining == 0: + del remaining[dc] break - else: - remaining[dc] -= 1 + else: + remaining[dc] = dc_remaining + return replica_map def export_for_schema(self): diff --git a/cassandra/pool.py b/cassandra/pool.py index ffae74af..a8e54d05 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -107,9 +107,6 @@ class Host(object): return old def __eq__(self, other): - if not isinstance(other, Host): - return False - return self.address == other.address def __str__(self): diff --git a/tests/unit/test_host_connection_pool.py b/tests/unit/test_host_connection_pool.py index 871d091f..dd45cd95 100644 --- a/tests/unit/test_host_connection_pool.py +++ b/tests/unit/test_host_connection_pool.py @@ -225,5 +225,3 @@ class HostConnectionPoolTests(unittest.TestCase): self.assertEqual(a, b, 'Two Host instances should be equal when sharing.') self.assertNotEqual(a, c, 'Two Host instances should NOT be equal when using two different addresses.') self.assertNotEqual(b, c, 'Two Host instances should NOT be equal when using two different addresses.') - - self.assertFalse(a == '127.0.0.1')