Make DCAware LBP tolerate DC changes during query plan
Fixes a RuntimeError that would raise if the DCAwareRoundRobinPolicy DC:host map changed during generation. PYTHON-297
This commit is contained in:
@@ -267,11 +267,11 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
|
|||||||
for host in islice(cycle(local_live), pos, pos + len(local_live)):
|
for host in islice(cycle(local_live), pos, pos + len(local_live)):
|
||||||
yield host
|
yield host
|
||||||
|
|
||||||
for dc, current_dc_hosts in six.iteritems(self._dc_live_hosts):
|
# the dict can change, so get candidate DCs iterating over keys of a copy
|
||||||
if dc == self.local_dc:
|
other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc]
|
||||||
continue
|
for dc in other_dcs:
|
||||||
|
remote_live = self._dc_live_hosts.get(dc, ())
|
||||||
for host in current_dc_hosts[:self.used_hosts_per_remote_dc]:
|
for host in remote_live[:self.used_hosts_per_remote_dc]:
|
||||||
yield host
|
yield host
|
||||||
|
|
||||||
def on_up(self, host):
|
def on_up(self, host):
|
||||||
|
|||||||
@@ -292,6 +292,160 @@ class DCAwareRoundRobinPolicyTest(unittest.TestCase):
|
|||||||
qplan = list(policy.make_query_plan())
|
qplan = list(policy.make_query_plan())
|
||||||
self.assertEqual(qplan, [])
|
self.assertEqual(qplan, [])
|
||||||
|
|
||||||
|
def test_modification_during_generation(self):
|
||||||
|
hosts = [Host(i, SimpleConvictionPolicy) for i in range(4)]
|
||||||
|
for h in hosts[:2]:
|
||||||
|
h.set_location_info("dc1", "rack1")
|
||||||
|
for h in hosts[2:]:
|
||||||
|
h.set_location_info("dc2", "rack1")
|
||||||
|
|
||||||
|
policy = DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=3)
|
||||||
|
policy.populate(Mock(), hosts)
|
||||||
|
|
||||||
|
# The general concept here is to change thee internal state of the
|
||||||
|
# policy during plan generation. In this case we use a grey-box
|
||||||
|
# approach that changes specific things during known phases of the
|
||||||
|
# generator.
|
||||||
|
|
||||||
|
new_host = Host(4, SimpleConvictionPolicy)
|
||||||
|
new_host.set_location_info("dc1", "rack1")
|
||||||
|
|
||||||
|
# new local before iteration
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
policy.on_up(new_host)
|
||||||
|
# local list is not bound yet, so we get to see that one
|
||||||
|
self.assertEqual(len(list(plan)), 3 + 2)
|
||||||
|
|
||||||
|
# remove local before iteration
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
policy.on_down(new_host)
|
||||||
|
# local list is not bound yet, so we don't see it
|
||||||
|
self.assertEqual(len(list(plan)), 2 + 2)
|
||||||
|
|
||||||
|
# new local after starting iteration
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
next(plan)
|
||||||
|
policy.on_up(new_host)
|
||||||
|
# local list was is bound, and one consumed, so we only see the other original
|
||||||
|
self.assertEqual(len(list(plan)), 1 + 2)
|
||||||
|
|
||||||
|
# remove local after traversing available
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(3):
|
||||||
|
next(plan)
|
||||||
|
policy.on_down(new_host)
|
||||||
|
# we should be past the local list
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 2)
|
||||||
|
|
||||||
|
# REMOTES CHANGE
|
||||||
|
new_host.set_location_info("dc2", "rack1")
|
||||||
|
|
||||||
|
# new remote after traversing local, but not starting remote
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(2):
|
||||||
|
next(plan)
|
||||||
|
policy.on_up(new_host)
|
||||||
|
# list is updated before we get to it
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 3)
|
||||||
|
|
||||||
|
# remove remote after traversing local, but not starting remote
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(2):
|
||||||
|
next(plan)
|
||||||
|
policy.on_down(new_host)
|
||||||
|
# list is updated before we get to it
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 2)
|
||||||
|
|
||||||
|
# new remote after traversing local, and starting remote
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(3):
|
||||||
|
next(plan)
|
||||||
|
policy.on_up(new_host)
|
||||||
|
# slice is already made, and we've consumed one
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 1)
|
||||||
|
|
||||||
|
# remove remote after traversing local, and starting remote
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(3):
|
||||||
|
next(plan)
|
||||||
|
policy.on_down(new_host)
|
||||||
|
# slice is created with all present, and we've consumed one
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 2)
|
||||||
|
|
||||||
|
# local DC disappears after finishing it, but not starting remote
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(2):
|
||||||
|
next(plan)
|
||||||
|
policy.on_down(hosts[0])
|
||||||
|
policy.on_down(hosts[1])
|
||||||
|
# dict traversal starts as normal
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 2)
|
||||||
|
policy.on_up(hosts[0])
|
||||||
|
policy.on_up(hosts[1])
|
||||||
|
|
||||||
|
# PYTHON-297 addresses the following cases, where DCs come and go
|
||||||
|
# during generation
|
||||||
|
# local DC disappears after finishing it, and starting remote
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(3):
|
||||||
|
next(plan)
|
||||||
|
policy.on_down(hosts[0])
|
||||||
|
policy.on_down(hosts[1])
|
||||||
|
# dict traversal has begun and consumed one
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 1)
|
||||||
|
policy.on_up(hosts[0])
|
||||||
|
policy.on_up(hosts[1])
|
||||||
|
|
||||||
|
# remote DC disappears after finishing local, but not starting remote
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(2):
|
||||||
|
next(plan)
|
||||||
|
policy.on_down(hosts[2])
|
||||||
|
policy.on_down(hosts[3])
|
||||||
|
# nothing left
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 0)
|
||||||
|
policy.on_up(hosts[2])
|
||||||
|
policy.on_up(hosts[3])
|
||||||
|
|
||||||
|
# remote DC disappears while traversing it
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(3):
|
||||||
|
next(plan)
|
||||||
|
policy.on_down(hosts[2])
|
||||||
|
policy.on_down(hosts[3])
|
||||||
|
# we continue with remainder of original list
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 1)
|
||||||
|
policy.on_up(hosts[2])
|
||||||
|
policy.on_up(hosts[3])
|
||||||
|
|
||||||
|
|
||||||
|
another_host = Host(5, SimpleConvictionPolicy)
|
||||||
|
another_host.set_location_info("dc3", "rack1")
|
||||||
|
new_host.set_location_info("dc3", "rack1")
|
||||||
|
|
||||||
|
# new DC while traversing remote
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(3):
|
||||||
|
next(plan)
|
||||||
|
policy.on_up(new_host)
|
||||||
|
policy.on_up(another_host)
|
||||||
|
# we continue with remainder of original list
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 1)
|
||||||
|
|
||||||
|
# remote DC disappears after finishing it
|
||||||
|
plan = policy.make_query_plan()
|
||||||
|
for _ in range(3):
|
||||||
|
next(plan)
|
||||||
|
last_host_in_this_dc = next(plan)
|
||||||
|
if last_host_in_this_dc in (new_host, another_host):
|
||||||
|
down_hosts = [new_host, another_host]
|
||||||
|
else:
|
||||||
|
down_hosts = hosts[2:]
|
||||||
|
for h in down_hosts:
|
||||||
|
policy.on_down(h)
|
||||||
|
# the last DC has two
|
||||||
|
self.assertEqual(len(list(plan)), 0 + 2)
|
||||||
|
|
||||||
def test_no_live_nodes(self):
|
def test_no_live_nodes(self):
|
||||||
"""
|
"""
|
||||||
Ensure query plan for a downed cluster will execute without errors
|
Ensure query plan for a downed cluster will execute without errors
|
||||||
|
|||||||
Reference in New Issue
Block a user