Optimize NTS replica map building
In the case where one DC used vnodes and the other did not, the previous algorithm hit a worst-case of O(n^2) on the number of tokens.
This commit is contained in:
@@ -5,6 +5,7 @@ try:
|
|||||||
except ImportError: # Python <2.7
|
except ImportError: # Python <2.7
|
||||||
from cassandra.util import OrderedDict # NOQA
|
from cassandra.util import OrderedDict # NOQA
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
|
from itertools import islice, cycle
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
@@ -394,25 +395,50 @@ class NetworkTopologyStrategy(ReplicationStrategy):
|
|||||||
for dc, rf in self.dc_replication_factors.items() if rf > 0)
|
for dc, rf in self.dc_replication_factors.items() if rf > 0)
|
||||||
dcs = dict((h, h.datacenter) for h in set(token_to_host_owner.values()))
|
dcs = dict((h, h.datacenter) for h in set(token_to_host_owner.values()))
|
||||||
|
|
||||||
|
# build a map of DCs to lists of indexes into `ring` for tokens that
|
||||||
|
# belong to that DC
|
||||||
|
dc_to_token_offset = defaultdict(list)
|
||||||
|
for i, token in enumerate(ring):
|
||||||
|
host = token_to_host_owner[token]
|
||||||
|
dc_to_token_offset[dcs[host]].append(i)
|
||||||
|
|
||||||
|
# A map of DCs to an index into the dc_to_token_offset value for that dc.
|
||||||
|
# This is how we keep track of advancing around the ring for each DC.
|
||||||
|
dc_to_current_index = defaultdict(int)
|
||||||
|
|
||||||
for i in ring_len_range:
|
for i in ring_len_range:
|
||||||
remaining = dc_rf_map.copy()
|
remaining = dc_rf_map.copy()
|
||||||
for j in ring_len_range:
|
replicas = replica_map[ring[i]]
|
||||||
token = ring[(i + j) % ring_len]
|
|
||||||
host = token_to_host_owner[token]
|
# go through each DC and find the replicas in that DC
|
||||||
dc = dcs[host]
|
for dc in dc_to_token_offset.keys():
|
||||||
if not dc in remaining:
|
if dc not in remaining:
|
||||||
# we already have all replicas for this DC
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not host in replica_map[ring[i]]:
|
# advance our per-DC index until we're up to at least the
|
||||||
replica_map[ring[i]].append(host)
|
# current token in the ring
|
||||||
|
token_offsets = dc_to_token_offset[dc]
|
||||||
|
index = dc_to_current_index[dc]
|
||||||
|
num_tokens = len(token_offsets)
|
||||||
|
while index < num_tokens and token_offsets[index] < i:
|
||||||
|
index += 1
|
||||||
|
dc_to_current_index[dc] = index
|
||||||
|
|
||||||
if remaining[dc] == 1:
|
# now add the next RF distinct token owners to the set of
|
||||||
del remaining[dc]
|
# replicas for this DC
|
||||||
if not remaining:
|
for token_offset in islice(cycle(token_offsets), index, index + num_tokens):
|
||||||
|
host = token_to_host_owner[ring[token_offset]]
|
||||||
|
if host in replicas:
|
||||||
|
continue
|
||||||
|
|
||||||
|
replicas.append(host)
|
||||||
|
dc_remaining = remaining[dc] - 1
|
||||||
|
if dc_remaining == 0:
|
||||||
|
del remaining[dc]
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
remaining[dc] -= 1
|
remaining[dc] = dc_remaining
|
||||||
|
|
||||||
return replica_map
|
return replica_map
|
||||||
|
|
||||||
def export_for_schema(self):
|
def export_for_schema(self):
|
||||||
|
|||||||
@@ -107,9 +107,6 @@ class Host(object):
|
|||||||
return old
|
return old
|
||||||
|
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
if not isinstance(other, Host):
|
|
||||||
return False
|
|
||||||
|
|
||||||
return self.address == other.address
|
return self.address == other.address
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
|||||||
@@ -225,5 +225,3 @@ class HostConnectionPoolTests(unittest.TestCase):
|
|||||||
self.assertEqual(a, b, 'Two Host instances should be equal when sharing.')
|
self.assertEqual(a, b, 'Two Host instances should be equal when sharing.')
|
||||||
self.assertNotEqual(a, c, 'Two Host instances should NOT be equal when using two different addresses.')
|
self.assertNotEqual(a, c, 'Two Host instances should NOT be equal when using two different addresses.')
|
||||||
self.assertNotEqual(b, c, 'Two Host instances should NOT be equal when using two different addresses.')
|
self.assertNotEqual(b, c, 'Two Host instances should NOT be equal when using two different addresses.')
|
||||||
|
|
||||||
self.assertFalse(a == '127.0.0.1')
|
|
||||||
|
|||||||
Reference in New Issue
Block a user