From 0b9f0177f8273c0728b9f6724f5335f8638def87 Mon Sep 17 00:00:00 2001 From: Matt Dietz Date: Sun, 15 Feb 2015 09:49:19 +0000 Subject: [PATCH] Refactors RM11449 The agent now correctly caches and uses the VIFs the first time it pulls them from XAPI, rather than fetching them over and over. Additionally implements handling for ack'ing VIFs only if they actually tagged and applied their flows. --- quark/agent/agent.py | 37 ++++----- quark/agent/xapi.py | 45 +++++------ quark/cache/redis_base.py | 44 +++++------ quark/cache/security_groups_client.py | 18 +++-- quark/tests/agent/test_agent.py | 29 ++++--- quark/tests/agent/test_xapi.py | 76 ++++++++----------- quark/tests/cache/test_redis_base.py | 23 ------ .../cache/test_security_groups_client.py | 11 ++- 8 files changed, 124 insertions(+), 159 deletions(-) diff --git a/quark/agent/agent.py b/quark/agent/agent.py index a828e56..5bb0063 100644 --- a/quark/agent/agent.py +++ b/quark/agent/agent.py @@ -66,28 +66,22 @@ def partition_vifs(xapi_client, interfaces, security_group_states): updated = [] removed = [] - with xapi_client.sessioned() as session: - for vif in interfaces: - vif_tagged = xapi_client.is_vif_tagged(session, vif.ref) - if vif_tagged is None: - # Couldn't get this VIF, it likely disappeared on us - continue + for vif in interfaces: + vif_has_groups = vif in security_group_states + if vif.tagged and vif_has_groups and security_group_states[vif]: + # Already ack'd these groups and VIF is tagged, reapply. + # If it's not tagged, fall through and have it self-heal + continue - vif_has_groups = vif in security_group_states - if vif_tagged and vif_has_groups and security_group_states[vif]: - # Already ack'd these groups and VIF is tagged, reapply. - # If it's not tagged, fall through and have it self-heal - continue - - if vif_tagged: - if vif_has_groups: - updated.append(vif) - else: - removed.append(vif) + if vif.tagged: + if vif_has_groups: + updated.append(vif) else: - if vif_has_groups: - added.append(vif) - # if not tagged and no groups, skip + removed.append(vif) + else: + if vif_has_groups: + added.append(vif) + # if not tagged and no groups, skip return added, updated, removed @@ -125,7 +119,8 @@ def run(): interfaces, sg_states) xapi_client.update_interfaces(new_sg, updated_sg, removed_sg) - ack_groups(new_sg + updated_sg) + groups_to_ack = [v for v in new_sg + updated_sg if v.success] + ack_groups(groups_to_ack) except Exception: LOG.exception("Unable to get security groups from registry and " diff --git a/quark/agent/xapi.py b/quark/agent/xapi.py index b7d7692..57a6953 100644 --- a/quark/agent/xapi.py +++ b/quark/agent/xapi.py @@ -32,12 +32,14 @@ agent_opts = [ ] CONF.register_opts(agent_opts, "AGENT") +SECURITY_GROUPS_KEY = "security_groups" +VM = namedtuple('VM', ['ref', 'uuid', 'vifs', 'dom_id']) class VIF(object): SEPARATOR = "." - def __init__(self, device_id, mac_address, ref): + def __init__(self, device_id, record, ref): """Constructs VIF `device_id` and `mac_address` should be strings if they will later be @@ -47,19 +49,31 @@ class VIF(object): """ self.device_id = device_id - self.mac_address = mac_address + self.record = record self.ref = ref + self.success = False def __str__(self): return "%s%s%s%s%s" % (self.device_id, self.SEPARATOR, self.mac_address, self.SEPARATOR, self.ref) + @property + def mac_address(self): + return self.record["MAC"] + + @property + def tagged(self): + return self.record["other_config"].get(SECURITY_GROUPS_KEY) + @classmethod def from_string(cls, s): device_id, mac_address, ref = s.split(cls.SEPARATOR) return cls(device_id, mac_address, ref) + def succeed(self): + self.success = True + def __repr__(self): return "VIF(%r, %r, %r)" % (self.device_id, self.mac_address, self.ref) @@ -75,11 +89,7 @@ class VIF(object): return hash((self.device_id, self.mac_address)) -VM = namedtuple('VM', ['ref', 'uuid', 'vifs', 'dom_id']) - - class XapiClient(object): - SECURITY_GROUPS_KEY = "security_groups" SECURITY_GROUPS_VALUE = "enabled" def __init__(self): @@ -140,32 +150,16 @@ class XapiClient(object): if not vm: continue device_id = vm.uuid - interfaces.add(VIF(device_id, rec["MAC"], vif_ref)) + interfaces.add(VIF(device_id, rec, vif_ref)) return interfaces - def is_vif_tagged(self, session, vif): - try: - vif_rec = session.xenapi.VIF.get_record(vif) - if vif_rec["other_config"].get(self.SECURITY_GROUPS_KEY): - LOG.debug("VIF %s enabled for security groups" % - vif_rec["uuid"]) - return True - return False - except XenAPI.Failure: - # We shouldn't lose all of them because one failed - # An example of a continuable failure is the VIF was deleted - # in the (albeit very small) window between the initial fetch - # and here. - LOG.exception("Failed to enable security groups for VIF " - "with MAC %s" % vif.mac_address) - def _set_security_groups(self, session, interfaces): LOG.debug("Setting security groups on %s", interfaces) for vif in interfaces: try: session.xenapi.VIF.add_to_other_config( - vif.ref, self.SECURITY_GROUPS_KEY, + vif.ref, SECURITY_GROUPS_KEY, self.SECURITY_GROUPS_VALUE) except XenAPI.Failure: # We shouldn't lose all of them because one failed @@ -182,7 +176,7 @@ class XapiClient(object): try: session.xenapi.VIF.remove_from_other_config( vif.ref, - self.SECURITY_GROUPS_KEY) + SECURITY_GROUPS_KEY) except XenAPI.Failure: # NOTE(mdietz): RM11399 - removing a parameter that doesn't # exist is idempotent. Trying to remove it @@ -201,6 +195,7 @@ class XapiClient(object): vm_rec = session.xenapi.VM.get_record(vif_rec["VM"]) vif_index = vif_rec["device"] dom_id = vm_rec["domid"] + vif.succeed() except XenAPI.Failure: LOG.exception("Failure when looking up VMs or VIFs") continue diff --git a/quark/cache/redis_base.py b/quark/cache/redis_base.py index 5691fc0..a898a1c 100644 --- a/quark/cache/redis_base.py +++ b/quark/cache/redis_base.py @@ -89,52 +89,49 @@ class ClientBase(object): def __init__(self, use_master=False): self._use_master = use_master - try: if CONF.QUARK.redis_use_sentinels: self._compile_sentinel_list() - self._ensure_connection_pool_exists() + self._ensure_connection_pools_exist() self._client = self._client() except redis.ConnectionError as e: LOG.exception(e) raise q_exc.RedisConnectionFailure() - def _prepare_pool_connection(self): + def _prepare_pool_connection(self, master): LOG.info("Creating redis connection pool for the first time...") host = CONF.QUARK.redis_host port = CONF.QUARK.redis_port + connect_args = [] connect_kw = {} if CONF.QUARK.redis_password: connect_kw["password"] = CONF.QUARK.redis_password - connect_args = [] - - klass = redis.ConnectionPool if CONF.QUARK.redis_use_sentinels: - connect_args.append(CONF.QUARK.redis_sentinel_master) - klass = redis.sentinel.SentinelConnectionPool - connect_args.append( - redis.sentinel.Sentinel(self._sentinel_list)) - connect_kw["check_connection"] = True - connect_kw["is_master"] = self._use_master LOG.info("Using redis sentinel connections %s" % self._sentinel_list) + klass = redis.sentinel.SentinelConnectionPool + connect_args.append(CONF.QUARK.redis_sentinel_master) + connect_args.append(redis.sentinel.Sentinel(self._sentinel_list)) + connect_kw["check_connection"] = True + connect_kw["is_master"] = master else: + LOG.info("Using redis host %s:%s" % (host, port)) + klass = redis.ConnectionPool connect_kw["host"] = host connect_kw["port"] = port - LOG.info("Using redis host %s:%s" % (host, port)) + return klass, connect_args, connect_kw - def _ensure_connection_pool_exists(self): - if not self._use_master and not ClientBase.read_connection_pool: - klass, connect_args, connect_kw = self._prepare_pool_connection() - ClientBase.read_connection_pool = klass(*connect_args, - **connect_kw) - elif self._use_master and not ClientBase.write_connection_pool: - klass, connect_args, connect_kw = self._prepare_pool_connection() - ClientBase.write_connection_pool = klass(*connect_args, - **connect_kw) + def _ensure_connection_pools_exist(self): + if not (ClientBase.write_connection_pool or + ClientBase.read_connection_pool): + klass, args, kwargs = self._prepare_pool_connection(master=False) + ClientBase.read_connection_pool = klass(*args, **kwargs) + + klass, args, kwargs = self._prepare_pool_connection(master=True) + ClientBase.write_connection_pool = klass(*args, **kwargs) def _compile_sentinel_list(self): self._sentinel_list = [tuple(host.split(':')) @@ -144,10 +141,9 @@ class ClientBase(object): "list of 'host:port' pairs") def _client(self): + pool = ClientBase.read_connection_pool if self._use_master: pool = ClientBase.write_connection_pool - else: - pool = ClientBase.read_connection_pool kwargs = {"connection_pool": pool, "db": CONF.QUARK.redis_db, diff --git a/quark/cache/security_groups_client.py b/quark/cache/security_groups_client.py index 5dca294..c825caf 100644 --- a/quark/cache/security_groups_client.py +++ b/quark/cache/security_groups_client.py @@ -169,6 +169,12 @@ class SecurityGroupsClient(redis_base.ClientBase): Returns a dictionary of xapi.VIFs with values of the current acknowledged status in Redis. + + States not explicitly handled: + * ack key, no rules - This is the same as just tagging the VIF, + the instance will be inaccessible + * rules key, no ack - Nothing will happen, the VIF will + not be tagged. """ LOG.debug("Getting security groups from Redis for {0}".format( interfaces)) @@ -179,15 +185,15 @@ class SecurityGroupsClient(redis_base.ClientBase): security_groups = self.get_fields(vif_keys, SECURITY_GROUP_ACK) ret = {} - for vif, security_group in zip(interfaces, security_groups): - if security_group: - security_group = security_group.lower() - if "true" in security_group: + for vif, security_group_ack in zip(interfaces, security_groups): + if security_group_ack: + security_group_ack = security_group_ack.lower() + if "true" in security_group_ack: ret[vif] = True - elif "false" in security_group: + elif "false" in security_group_ack: ret[vif] = False else: - LOG.debug("Skipping bad ack value %s" % security_group) + LOG.debug("Skipping bad ack value %s" % security_group_ack) return ret @utils.retry_loop(3) diff --git a/quark/tests/agent/test_agent.py b/quark/tests/agent/test_agent.py index c074e7a..26fa841 100644 --- a/quark/tests/agent/test_agent.py +++ b/quark/tests/agent/test_agent.py @@ -22,26 +22,31 @@ from quark.tests import test_base class TestAgentPartitionVifs(test_base.TestBase): @mock.patch("quark.agent.xapi.XapiClient._session") - @mock.patch("quark.agent.xapi.XapiClient.is_vif_tagged") - def test_partition_vifs(self, is_vif_tagged, sess): - interfaces = [xapi.VIF("added", 1, "added_ref"), - xapi.VIF("updated", 2, "updated_ref"), - xapi.VIF("removed", 3, "removed_ref"), - xapi.VIF("no groups", 4, "no groups ref"), - xapi.VIF("not found", 5, "not found ref"), - xapi.VIF("self heal", 6, "self heal ref")] + def test_partition_vifs(self, sess): + def _vif_rec(mac, tagged): + rec = {"MAC": mac, "other_config": {}} + if tagged: + rec["other_config"] = {"security_groups": "enabled"} + return rec + + vif_recs = [_vif_rec(1, False), _vif_rec(2, True), _vif_rec(3, True), + _vif_rec(4, False), _vif_rec(5, False)] + + interfaces = [xapi.VIF("added", vif_recs[0], "added_ref"), + xapi.VIF("updated", vif_recs[1], "updated_ref"), + xapi.VIF("removed", vif_recs[2], "removed_ref"), + xapi.VIF("no groups", vif_recs[3], "no groups ref"), + xapi.VIF("self heal", vif_recs[4], "self heal ref")] sg_states = {interfaces[0]: False, interfaces[1]: False, - interfaces[5]: True} + interfaces[4]: True} xapi_client = xapi.XapiClient() - is_vif_tagged.side_effect = [False, True, True, False, None, - False] added, updated, removed = agent.partition_vifs(xapi_client, interfaces, sg_states) - self.assertEqual(added, [interfaces[0], interfaces[5]]) + self.assertEqual(added, [interfaces[0], interfaces[4]]) self.assertEqual(updated, [interfaces[1]]) self.assertEqual(removed, [interfaces[2]]) diff --git a/quark/tests/agent/test_xapi.py b/quark/tests/agent/test_xapi.py index 6a5b368..5dca929 100644 --- a/quark/tests/agent/test_xapi.py +++ b/quark/tests/agent/test_xapi.py @@ -8,27 +8,28 @@ import mock class TestVIF(test_base.TestBase): def test_str(self): - self.assertEqual(str(xapi.VIF("1", "2", "3")), "1.2.3") + rec = {"MAC": 2} + self.assertEqual(str(xapi.VIF("1", rec, "3")), "1.2.3") def test_repr(self): - self.assertEqual(repr(xapi.VIF("1", "2", "3")), "VIF('1', '2', '3')") + rec = {"MAC": '2'} + self.assertEqual(repr(xapi.VIF("1", rec, "3")), "VIF('1', '2', '3')") def test_eq(self): - self.assertEqual(xapi.VIF("1", "2", "3"), xapi.VIF("1", "2", "3")) + rec = {"MAC": '2'} + self.assertEqual(xapi.VIF("1", rec, "3"), xapi.VIF("1", rec, "3")) def test_ne(self): - self.assertNotEqual(xapi.VIF("1", "2", "4"), xapi.VIF("1", "3", "4")) - self.assertNotEqual(xapi.VIF("1", "2", "4"), xapi.VIF("3", "2", "4")) - self.assertNotEqual(xapi.VIF("1", "2", "4"), xapi.VIF("3", "4", "4")) + rec1 = {"MAC": '2'} + rec2 = {"MAC": '3'} + vif1 = xapi.VIF("1", rec1, "4") + vif2 = xapi.VIF("1", rec2, "4") + vif3 = xapi.VIF("3", rec1, "4") + vif4 = xapi.VIF("3", rec2, "4") - def test_hashable(self): - self.assertEqual( - tuple(set([xapi.VIF("1", "2", "3"), xapi.VIF("1", "2", "3")])), - (xapi.VIF("1", "2", "3"),)) - - def test_from_string(self): - self.assertEqual(xapi.VIF.from_string("1.2.3"), - xapi.VIF("1", "2", "3")) + self.assertNotEqual(vif1, vif2) + self.assertNotEqual(vif1, vif3) + self.assertNotEqual(vif1, vif4) class TestXapiClient(test_base.TestBase): @@ -70,21 +71,19 @@ class TestXapiClient(test_base.TestBase): "opaque_vif2": {"VM": "opaque2", "MAC": "55:44:33:22:11:00"}, } interfaces = self.xclient.get_interfaces() - self.assertEqual(interfaces, - set([xapi.VIF("device_id1", "00:11:22:33:44:55", - "opaque_vif1")])) + rec = {"MAC": "00:11:22:33:44:55", "VM": "opaque1"} + expected = set([xapi.VIF("device_id1", rec, "opaque_vif1")]) + self.assertEqual(interfaces, expected) @mock.patch("quark.agent.xapi.XapiClient.get_instances") - @mock.patch("quark.agent.xapi.XapiClient.is_vif_tagged") - def test_update_interfaces_added(self, record_exist, get_instances): - record_exist.side_effect = [False, True] - + def test_update_interfaces_added(self, get_instances): instances = {"opaque1": xapi.VM(uuid="device_id1", ref="opaque1", vifs=["opaque_vif1"], dom_id="1")} get_instances.return_value = instances - interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55", + rec = {"MAC": "00:11:22:33:44:55"} + interfaces = [xapi.VIF("device_id1", rec, "opaque_vif1")] dom_id = "1" @@ -108,10 +107,9 @@ class TestXapiClient(test_base.TestBase): "neutron_vif_flow", "online_instance_flows", expected_args) - @mock.patch("quark.agent.xapi.XapiClient.is_vif_tagged") - def test_update_interfaces_added_vm_removed(self, record_exist): - record_exist.side_effect = [False, True] - interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55", + def test_update_interfaces_added_vm_removed(self): + rec = {"MAC": "00:11:22:33:44:55"} + interfaces = [xapi.VIF("device_id1", rec, "opaque_vif1")] vif_index = "0" @@ -131,7 +129,8 @@ class TestXapiClient(test_base.TestBase): self.assertEqual(self.session.xenapi.host.call_plugin.call_count, 0) def test_update_interfaces_updated(self): - interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55", + rec = {"MAC": "00:11:22:33:44:55"} + interfaces = [xapi.VIF("device_id1", rec, "opaque_vif1")] dom_id = "1" @@ -157,7 +156,8 @@ class TestXapiClient(test_base.TestBase): expected_args) def test_update_interfaces_removed(self): - interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55", + rec = {"MAC": "00:11:22:33:44:55"} + interfaces = [xapi.VIF("device_id1", rec, "opaque_vif1")] dom_id = "1" @@ -182,7 +182,8 @@ class TestXapiClient(test_base.TestBase): expected_args) def test_update_interfaces_removed_vm_removed(self): - interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55", + rec = {"MAC": "00:11:22:33:44:55"} + interfaces = [xapi.VIF("device_id1", rec, "opaque_vif1")] self.session.xenapi.VIF.get_record.side_effect = XenAPI.Failure( @@ -207,22 +208,9 @@ class TestXapiClient(test_base.TestBase): self.assertEqual(self.session.xenapi.host.call_plugin.call_count, 0) - def test_is_vif_tagged(self): - self.session.xenapi.VIF.get_record.return_value = { - 'other_config': {}, - 'uuid': '2d865be2-f626-d89f-6c91-7bd8fb521a3a' - } - self.assertFalse(self.xclient.is_vif_tagged(self.session, "vif")) - - def test_is_vif_tagged_record(self): - self.session.xenapi.VIF.get_record.return_value = { - 'other_config': {"security_groups": {"enabled": True}}, - 'uuid': '2d865be2-f626-d89f-6c91-7bd8fb521a3a' - } - self.assertTrue(self.xclient.is_vif_tagged(self.session, "vif")) - def test_update_interfaces_removed_raises(self): - interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55", + rec = {"MAC": "00:11:22:33:44:55"} + interfaces = [xapi.VIF("device_id1", rec, "opaque_vif1")] dom_id = "1" diff --git a/quark/tests/cache/test_redis_base.py b/quark/tests/cache/test_redis_base.py index 3a54d17..79b689c 100644 --- a/quark/tests/cache/test_redis_base.py +++ b/quark/tests/cache/test_redis_base.py @@ -54,29 +54,6 @@ class TestClientBase(test_base.TestBase): port = 6379 redis_base.ClientBase() conn_pool.assert_called_with(host=host, port=port) - self.assertIsNotNone(redis_base.ClientBase.read_connection_pool) - self.assertIsNone(redis_base.ClientBase.write_connection_pool) - - @mock.patch("redis.ConnectionPool") - @mock.patch("quark.cache.redis_base.redis.StrictRedis") - def test_init_master(self, strict_redis, conn_pool): - host = "127.0.0.1" - port = 6379 - redis_base.ClientBase(use_master=True) - conn_pool.assert_called_with(host=host, port=port) - self.assertIsNone(redis_base.ClientBase.read_connection_pool) - self.assertIsNotNone(redis_base.ClientBase.write_connection_pool) - - @mock.patch("redis.ConnectionPool") - @mock.patch("quark.cache.redis_base.redis.StrictRedis") - def test_init_both(self, strict_redis, conn_pool): - host = "127.0.0.1" - port = 6379 - redis_base.ClientBase() - redis_base.ClientBase(use_master=True) - - conn_pool.assert_called_with(host=host, port=port) - self.assertIsNotNone(redis_base.ClientBase.read_connection_pool) self.assertIsNotNone(redis_base.ClientBase.write_connection_pool) diff --git a/quark/tests/cache/test_security_groups_client.py b/quark/tests/cache/test_security_groups_client.py index 905ce08..d58036e 100644 --- a/quark/tests/cache/test_security_groups_client.py +++ b/quark/tests/cache/test_security_groups_client.py @@ -310,8 +310,10 @@ class TestRedisForAgent(test_base.TestBase): '{"%s": True}' % sg_client.SECURITY_GROUP_ACK, '{"%s": "1-2-3"}' % sg_client.SECURITY_GROUP_ACK] - new_interfaces = ([VIF(1, 2, 9), VIF(3, 4, 0), VIF(5, 6, 1), - VIF(7, 8, 2), VIF(9, 0, 3)]) + recs = [{"MAC": 2}, {"MAC": 4}, {"MAC": 6}, {"MAC": 8}, {"MAC": 0}] + new_interfaces = ([VIF(1, recs[0], 9), VIF(3, recs[1], 0), + VIF(5, recs[2], 1), VIF(7, recs[3], 2), + VIF(9, recs[4], 3)]) group_states = rc.get_security_group_states(new_interfaces) @@ -319,5 +321,6 @@ class TestRedisForAgent(test_base.TestBase): ["1.000000000002", "3.000000000004", "5.000000000006", "7.000000000008", "9.000000000000"], sg_client.SECURITY_GROUP_ACK) - self.assertEqual(group_states, {VIF(5, 6, 1): False, - VIF(7, 8, 2): True}) + + self.assertEqual(group_states, {new_interfaces[2]: False, + new_interfaces[3]: True})