From d0ee3bc5470aea7fa67d5b80189e51cb68ec64b6 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 22 Apr 2015 12:58:50 -0500 Subject: [PATCH] Make DCAware LBP tolerate DC changes during query plan Fixes a RuntimeError that would raise if the DCAwareRoundRobinPolicy DC:host map changed during generation. PYTHON-297 --- cassandra/policies.py | 10 +-- tests/unit/test_policies.py | 154 ++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+), 5 deletions(-) diff --git a/cassandra/policies.py b/cassandra/policies.py index 0dc36af7..244df241 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -267,11 +267,11 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy): for host in islice(cycle(local_live), pos, pos + len(local_live)): yield host - for dc, current_dc_hosts in six.iteritems(self._dc_live_hosts): - if dc == self.local_dc: - continue - - for host in current_dc_hosts[:self.used_hosts_per_remote_dc]: + # the dict can change, so get candidate DCs iterating over keys of a copy + other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc] + for dc in other_dcs: + remote_live = self._dc_live_hosts.get(dc, ()) + for host in remote_live[:self.used_hosts_per_remote_dc]: yield host def on_up(self, host): diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index c6f049e0..3e24f71f 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -292,6 +292,160 @@ class DCAwareRoundRobinPolicyTest(unittest.TestCase): qplan = list(policy.make_query_plan()) self.assertEqual(qplan, []) + def test_modification_during_generation(self): + hosts = [Host(i, SimpleConvictionPolicy) for i in range(4)] + for h in hosts[:2]: + h.set_location_info("dc1", "rack1") + for h in hosts[2:]: + h.set_location_info("dc2", "rack1") + + policy = DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=3) + policy.populate(Mock(), hosts) + + # The general concept here is to change thee internal state of the + # policy during plan generation. In this case we use a grey-box + # approach that changes specific things during known phases of the + # generator. + + new_host = Host(4, SimpleConvictionPolicy) + new_host.set_location_info("dc1", "rack1") + + # new local before iteration + plan = policy.make_query_plan() + policy.on_up(new_host) + # local list is not bound yet, so we get to see that one + self.assertEqual(len(list(plan)), 3 + 2) + + # remove local before iteration + plan = policy.make_query_plan() + policy.on_down(new_host) + # local list is not bound yet, so we don't see it + self.assertEqual(len(list(plan)), 2 + 2) + + # new local after starting iteration + plan = policy.make_query_plan() + next(plan) + policy.on_up(new_host) + # local list was is bound, and one consumed, so we only see the other original + self.assertEqual(len(list(plan)), 1 + 2) + + # remove local after traversing available + plan = policy.make_query_plan() + for _ in range(3): + next(plan) + policy.on_down(new_host) + # we should be past the local list + self.assertEqual(len(list(plan)), 0 + 2) + + # REMOTES CHANGE + new_host.set_location_info("dc2", "rack1") + + # new remote after traversing local, but not starting remote + plan = policy.make_query_plan() + for _ in range(2): + next(plan) + policy.on_up(new_host) + # list is updated before we get to it + self.assertEqual(len(list(plan)), 0 + 3) + + # remove remote after traversing local, but not starting remote + plan = policy.make_query_plan() + for _ in range(2): + next(plan) + policy.on_down(new_host) + # list is updated before we get to it + self.assertEqual(len(list(plan)), 0 + 2) + + # new remote after traversing local, and starting remote + plan = policy.make_query_plan() + for _ in range(3): + next(plan) + policy.on_up(new_host) + # slice is already made, and we've consumed one + self.assertEqual(len(list(plan)), 0 + 1) + + # remove remote after traversing local, and starting remote + plan = policy.make_query_plan() + for _ in range(3): + next(plan) + policy.on_down(new_host) + # slice is created with all present, and we've consumed one + self.assertEqual(len(list(plan)), 0 + 2) + + # local DC disappears after finishing it, but not starting remote + plan = policy.make_query_plan() + for _ in range(2): + next(plan) + policy.on_down(hosts[0]) + policy.on_down(hosts[1]) + # dict traversal starts as normal + self.assertEqual(len(list(plan)), 0 + 2) + policy.on_up(hosts[0]) + policy.on_up(hosts[1]) + + # PYTHON-297 addresses the following cases, where DCs come and go + # during generation + # local DC disappears after finishing it, and starting remote + plan = policy.make_query_plan() + for _ in range(3): + next(plan) + policy.on_down(hosts[0]) + policy.on_down(hosts[1]) + # dict traversal has begun and consumed one + self.assertEqual(len(list(plan)), 0 + 1) + policy.on_up(hosts[0]) + policy.on_up(hosts[1]) + + # remote DC disappears after finishing local, but not starting remote + plan = policy.make_query_plan() + for _ in range(2): + next(plan) + policy.on_down(hosts[2]) + policy.on_down(hosts[3]) + # nothing left + self.assertEqual(len(list(plan)), 0 + 0) + policy.on_up(hosts[2]) + policy.on_up(hosts[3]) + + # remote DC disappears while traversing it + plan = policy.make_query_plan() + for _ in range(3): + next(plan) + policy.on_down(hosts[2]) + policy.on_down(hosts[3]) + # we continue with remainder of original list + self.assertEqual(len(list(plan)), 0 + 1) + policy.on_up(hosts[2]) + policy.on_up(hosts[3]) + + + another_host = Host(5, SimpleConvictionPolicy) + another_host.set_location_info("dc3", "rack1") + new_host.set_location_info("dc3", "rack1") + + # new DC while traversing remote + plan = policy.make_query_plan() + for _ in range(3): + next(plan) + policy.on_up(new_host) + policy.on_up(another_host) + # we continue with remainder of original list + self.assertEqual(len(list(plan)), 0 + 1) + + # remote DC disappears after finishing it + plan = policy.make_query_plan() + for _ in range(3): + next(plan) + last_host_in_this_dc = next(plan) + if last_host_in_this_dc in (new_host, another_host): + down_hosts = [new_host, another_host] + else: + down_hosts = hosts[2:] + for h in down_hosts: + policy.on_down(h) + # the last DC has two + self.assertEqual(len(list(plan)), 0 + 2) + def test_no_live_nodes(self): """ Ensure query plan for a downed cluster will execute without errors