avoid quadratic token processing for RF > nodes
PYTHON-379
This commit is contained in:
@@ -469,8 +469,6 @@ class NetworkTopologyStrategy(ReplicationStrategy):
|
|||||||
(str(k), int(v)) for k, v in dc_replication_factors.items())
|
(str(k), int(v)) for k, v in dc_replication_factors.items())
|
||||||
|
|
||||||
def make_token_replica_map(self, token_to_host_owner, ring):
|
def make_token_replica_map(self, token_to_host_owner, ring):
|
||||||
# note: this does not account for hosts having different racks
|
|
||||||
replica_map = defaultdict(list)
|
|
||||||
dc_rf_map = dict((dc, int(rf))
|
dc_rf_map = dict((dc, int(rf))
|
||||||
for dc, rf in self.dc_replication_factors.items() if rf > 0)
|
for dc, rf in self.dc_replication_factors.items() if rf > 0)
|
||||||
|
|
||||||
@@ -478,16 +476,19 @@ class NetworkTopologyStrategy(ReplicationStrategy):
|
|||||||
# belong to that DC
|
# belong to that DC
|
||||||
dc_to_token_offset = defaultdict(list)
|
dc_to_token_offset = defaultdict(list)
|
||||||
dc_racks = defaultdict(set)
|
dc_racks = defaultdict(set)
|
||||||
|
hosts_per_dc = defaultdict(set)
|
||||||
for i, token in enumerate(ring):
|
for i, token in enumerate(ring):
|
||||||
host = token_to_host_owner[token]
|
host = token_to_host_owner[token]
|
||||||
dc_to_token_offset[host.datacenter].append(i)
|
dc_to_token_offset[host.datacenter].append(i)
|
||||||
if host.datacenter and host.rack:
|
if host.datacenter and host.rack:
|
||||||
dc_racks[host.datacenter].add(host.rack)
|
dc_racks[host.datacenter].add(host.rack)
|
||||||
|
hosts_per_dc[host.datacenter].add(host)
|
||||||
|
|
||||||
# A map of DCs to an index into the dc_to_token_offset value for that dc.
|
# A map of DCs to an index into the dc_to_token_offset value for that dc.
|
||||||
# This is how we keep track of advancing around the ring for each DC.
|
# This is how we keep track of advancing around the ring for each DC.
|
||||||
dc_to_current_index = defaultdict(int)
|
dc_to_current_index = defaultdict(int)
|
||||||
|
|
||||||
|
replica_map = defaultdict(list)
|
||||||
for i in range(len(ring)):
|
for i in range(len(ring)):
|
||||||
replicas = replica_map[ring[i]]
|
replicas = replica_map[ring[i]]
|
||||||
|
|
||||||
@@ -506,12 +507,14 @@ class NetworkTopologyStrategy(ReplicationStrategy):
|
|||||||
dc_to_current_index[dc] = index
|
dc_to_current_index[dc] = index
|
||||||
|
|
||||||
replicas_remaining = dc_rf_map[dc]
|
replicas_remaining = dc_rf_map[dc]
|
||||||
|
replicas_this_dc = 0
|
||||||
skipped_hosts = []
|
skipped_hosts = []
|
||||||
racks_placed = set()
|
racks_placed = set()
|
||||||
racks_this_dc = dc_racks[dc]
|
racks_this_dc = dc_racks[dc]
|
||||||
|
hosts_this_dc = len(hosts_per_dc[dc])
|
||||||
for token_offset in islice(cycle(token_offsets), index, index + num_tokens):
|
for token_offset in islice(cycle(token_offsets), index, index + num_tokens):
|
||||||
host = token_to_host_owner[ring[token_offset]]
|
host = token_to_host_owner[ring[token_offset]]
|
||||||
if replicas_remaining == 0:
|
if replicas_remaining == 0 or replicas_this_dc == hosts_this_dc:
|
||||||
break
|
break
|
||||||
|
|
||||||
if host in replicas:
|
if host in replicas:
|
||||||
@@ -522,6 +525,7 @@ class NetworkTopologyStrategy(ReplicationStrategy):
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
replicas.append(host)
|
replicas.append(host)
|
||||||
|
replicas_this_dc += 1
|
||||||
replicas_remaining -= 1
|
replicas_remaining -= 1
|
||||||
racks_placed.add(host.rack)
|
racks_placed.add(host.rack)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user