Handle race condition on building token map
If the token map is built prior to keyspace information being processed, the token map will not have any knowledge of keyspaces. This change causes the token map to be rebuilt whenever keyspaces change. Relates to pull request #36
This commit is contained in:
@@ -51,6 +51,11 @@ class Metadata(object):
|
||||
A map from keyspace names to matching :class:`~.KeyspaceMetadata` instances.
|
||||
"""
|
||||
|
||||
partitioner = None
|
||||
"""
|
||||
The string name of the partitioner for the cluster.
|
||||
"""
|
||||
|
||||
token_map = None
|
||||
""" A :class:`~.TokenMap` instance describing the ring topology. """
|
||||
|
||||
@@ -106,6 +111,8 @@ class Metadata(object):
|
||||
# remove not-just-added keyspaces
|
||||
self.keyspaces = dict((name, meta) for name, meta in self.keyspaces.items()
|
||||
if name in added_keyspaces)
|
||||
if self.token_map:
|
||||
self.token_map.rebuild(self.keyspaces.values())
|
||||
else:
|
||||
# keyspace is not None, table is not None
|
||||
try:
|
||||
@@ -249,6 +256,7 @@ class Metadata(object):
|
||||
system topology tables.
|
||||
For internal use only.
|
||||
"""
|
||||
self.partitioner = partitioner
|
||||
if partitioner.endswith('RandomPartitioner'):
|
||||
token_class = MD5Token
|
||||
elif partitioner.endswith('Murmur3Partitioner'):
|
||||
@@ -263,17 +271,17 @@ class Metadata(object):
|
||||
self.token_map = None
|
||||
return
|
||||
|
||||
token_to_primary_replica = {}
|
||||
token_to_host_owner = {}
|
||||
ring = []
|
||||
for host, token_strings in token_map.iteritems():
|
||||
for token_string in token_strings:
|
||||
token = token_class(token_string)
|
||||
ring.append(token)
|
||||
token_to_primary_replica[token] = host
|
||||
token_to_host_owner[token] = host
|
||||
|
||||
all_tokens = sorted(ring)
|
||||
self.token_map = TokenMap(
|
||||
token_class, token_to_primary_replica, all_tokens,
|
||||
token_class, token_to_host_owner, all_tokens,
|
||||
self.keyspaces.values())
|
||||
|
||||
def get_replicas(self, keyspace, key):
|
||||
@@ -335,7 +343,7 @@ class ReplicationStrategy(object):
|
||||
elif strategy_class.endswith("LocalStrategy"):
|
||||
return LocalStrategy()
|
||||
|
||||
def make_token_replica_map(token_to_primary_replica, ring):
|
||||
def make_token_replica_map(token_to_host_owner, ring):
|
||||
raise NotImplementedError()
|
||||
|
||||
def export_for_schema(self):
|
||||
@@ -350,13 +358,13 @@ class SimpleStrategy(ReplicationStrategy):
|
||||
def __init__(self, replication_factor):
|
||||
self.replication_factor = int(replication_factor)
|
||||
|
||||
def make_token_replica_map(self, token_to_primary_replica, ring):
|
||||
def make_token_replica_map(self, token_to_host_owner, ring):
|
||||
replica_map = {}
|
||||
for i in range(len(ring)):
|
||||
j, hosts = 0, set()
|
||||
while len(hosts) < self.replication_factor and j < len(ring):
|
||||
token = ring[(i + j) % len(ring)]
|
||||
hosts.add(token_to_primary_replica[token])
|
||||
hosts.add(token_to_host_owner[token])
|
||||
j += 1
|
||||
|
||||
replica_map[ring[i]] = hosts
|
||||
@@ -375,13 +383,13 @@ class NetworkTopologyStrategy(ReplicationStrategy):
|
||||
def __init__(self, dc_replication_factors):
|
||||
self.dc_replication_factors = dc_replication_factors
|
||||
|
||||
def make_token_replica_map(self, token_to_primary_replica, ring):
|
||||
def make_token_replica_map(self, token_to_host_owner, ring):
|
||||
# note: this does not account for hosts having different racks
|
||||
replica_map = {}
|
||||
for i in range(len(ring)):
|
||||
remaining = self.dc_replication_factors.copy()
|
||||
for j in range(len(ring)):
|
||||
host = token_to_primary_replica[ring[(i + j) % len(ring)]]
|
||||
host = token_to_host_owner[ring[(i + j) % len(ring)]]
|
||||
if not host.datacenter:
|
||||
continue
|
||||
|
||||
@@ -410,7 +418,7 @@ class LocalStrategy(ReplicationStrategy):
|
||||
|
||||
name = "LocalStrategy"
|
||||
|
||||
def make_token_replica_map(self, token_to_primary_replica, ring):
|
||||
def make_token_replica_map(self, token_to_host_owner, ring):
|
||||
return {}
|
||||
|
||||
def export_for_schema(self):
|
||||
@@ -725,6 +733,11 @@ class TokenMap(object):
|
||||
A subclass of :class:`.Token`, depending on what partitioner the cluster uses.
|
||||
"""
|
||||
|
||||
token_to_host_owner = None
|
||||
"""
|
||||
A map of :class:`.Token` objects to the :class:`.Host` that owns that token.
|
||||
"""
|
||||
|
||||
tokens_to_hosts_by_ks = None
|
||||
"""
|
||||
A map of keyspace names to a nested map of :class:`.Token` objects to
|
||||
@@ -736,22 +749,33 @@ class TokenMap(object):
|
||||
An ordered list of :class:`.Token` instances in the ring.
|
||||
"""
|
||||
|
||||
def __init__(self, token_class, token_to_primary_replica, all_tokens, keyspaces):
|
||||
def __init__(self, token_class, token_to_host_owner, all_tokens, keyspaces):
|
||||
self.token_class = token_class
|
||||
self.ring = all_tokens
|
||||
self.token_to_host_owner = token_to_host_owner
|
||||
|
||||
self.tokens_to_hosts_by_ks = {}
|
||||
for ks_metadata in keyspaces:
|
||||
self.rebuild(keyspaces)
|
||||
|
||||
def rebuild(self, current_keyspaces):
|
||||
"""
|
||||
Given an up-to-date list of :class:`.KeyspaceMetadata` instances, rebuild
|
||||
the per-keyspace replication map.
|
||||
"""
|
||||
tokens_to_hosts_by_ks = {}
|
||||
for ks_metadata in current_keyspaces:
|
||||
strategy = ks_metadata.replication_strategy
|
||||
if strategy is None:
|
||||
token_to_hosts = defaultdict(set)
|
||||
for token, host in token_to_primary_replica.items():
|
||||
for token, host in self.token_to_host_owner.items():
|
||||
token_to_hosts[token].add(host)
|
||||
self.tokens_to_hosts_by_ks[ks_metadata.name] = token_to_hosts
|
||||
tokens_to_hosts_by_ks[ks_metadata.name] = token_to_hosts
|
||||
else:
|
||||
self.tokens_to_hosts_by_ks[ks_metadata.name] = \
|
||||
tokens_to_hosts_by_ks[ks_metadata.name] = \
|
||||
strategy.make_token_replica_map(
|
||||
token_to_primary_replica, all_tokens)
|
||||
self.token_to_host_owner, self.ring)
|
||||
|
||||
self.tokens_to_hosts_by_ks = tokens_to_hosts_by_ks
|
||||
|
||||
def get_replicas(self, keyspace, token):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user