Refactors RM11449

The agent now correctly caches and uses the VIFs the first time it pulls
them from XAPI, rather than fetching them over and over. Additionally
implements handling for ack'ing VIFs only if they actually tagged and
applied their flows.
This commit is contained in:
Matt Dietz
2015-02-15 09:49:19 +00:00
parent b0f3175878
commit 0b9f0177f8
8 changed files with 124 additions and 159 deletions

View File

@@ -66,20 +66,14 @@ def partition_vifs(xapi_client, interfaces, security_group_states):
updated = []
removed = []
with xapi_client.sessioned() as session:
for vif in interfaces:
vif_tagged = xapi_client.is_vif_tagged(session, vif.ref)
if vif_tagged is None:
# Couldn't get this VIF, it likely disappeared on us
continue
vif_has_groups = vif in security_group_states
if vif_tagged and vif_has_groups and security_group_states[vif]:
if vif.tagged and vif_has_groups and security_group_states[vif]:
# Already ack'd these groups and VIF is tagged, reapply.
# If it's not tagged, fall through and have it self-heal
continue
if vif_tagged:
if vif.tagged:
if vif_has_groups:
updated.append(vif)
else:
@@ -125,7 +119,8 @@ def run():
interfaces,
sg_states)
xapi_client.update_interfaces(new_sg, updated_sg, removed_sg)
ack_groups(new_sg + updated_sg)
groups_to_ack = [v for v in new_sg + updated_sg if v.success]
ack_groups(groups_to_ack)
except Exception:
LOG.exception("Unable to get security groups from registry and "

View File

@@ -32,12 +32,14 @@ agent_opts = [
]
CONF.register_opts(agent_opts, "AGENT")
SECURITY_GROUPS_KEY = "security_groups"
VM = namedtuple('VM', ['ref', 'uuid', 'vifs', 'dom_id'])
class VIF(object):
SEPARATOR = "."
def __init__(self, device_id, mac_address, ref):
def __init__(self, device_id, record, ref):
"""Constructs VIF
`device_id` and `mac_address` should be strings if they will later be
@@ -47,19 +49,31 @@ class VIF(object):
"""
self.device_id = device_id
self.mac_address = mac_address
self.record = record
self.ref = ref
self.success = False
def __str__(self):
return "%s%s%s%s%s" % (self.device_id, self.SEPARATOR,
self.mac_address, self.SEPARATOR,
self.ref)
@property
def mac_address(self):
return self.record["MAC"]
@property
def tagged(self):
return self.record["other_config"].get(SECURITY_GROUPS_KEY)
@classmethod
def from_string(cls, s):
device_id, mac_address, ref = s.split(cls.SEPARATOR)
return cls(device_id, mac_address, ref)
def succeed(self):
self.success = True
def __repr__(self):
return "VIF(%r, %r, %r)" % (self.device_id, self.mac_address,
self.ref)
@@ -75,11 +89,7 @@ class VIF(object):
return hash((self.device_id, self.mac_address))
VM = namedtuple('VM', ['ref', 'uuid', 'vifs', 'dom_id'])
class XapiClient(object):
SECURITY_GROUPS_KEY = "security_groups"
SECURITY_GROUPS_VALUE = "enabled"
def __init__(self):
@@ -140,32 +150,16 @@ class XapiClient(object):
if not vm:
continue
device_id = vm.uuid
interfaces.add(VIF(device_id, rec["MAC"], vif_ref))
interfaces.add(VIF(device_id, rec, vif_ref))
return interfaces
def is_vif_tagged(self, session, vif):
try:
vif_rec = session.xenapi.VIF.get_record(vif)
if vif_rec["other_config"].get(self.SECURITY_GROUPS_KEY):
LOG.debug("VIF %s enabled for security groups" %
vif_rec["uuid"])
return True
return False
except XenAPI.Failure:
# We shouldn't lose all of them because one failed
# An example of a continuable failure is the VIF was deleted
# in the (albeit very small) window between the initial fetch
# and here.
LOG.exception("Failed to enable security groups for VIF "
"with MAC %s" % vif.mac_address)
def _set_security_groups(self, session, interfaces):
LOG.debug("Setting security groups on %s", interfaces)
for vif in interfaces:
try:
session.xenapi.VIF.add_to_other_config(
vif.ref, self.SECURITY_GROUPS_KEY,
vif.ref, SECURITY_GROUPS_KEY,
self.SECURITY_GROUPS_VALUE)
except XenAPI.Failure:
# We shouldn't lose all of them because one failed
@@ -182,7 +176,7 @@ class XapiClient(object):
try:
session.xenapi.VIF.remove_from_other_config(
vif.ref,
self.SECURITY_GROUPS_KEY)
SECURITY_GROUPS_KEY)
except XenAPI.Failure:
# NOTE(mdietz): RM11399 - removing a parameter that doesn't
# exist is idempotent. Trying to remove it
@@ -201,6 +195,7 @@ class XapiClient(object):
vm_rec = session.xenapi.VM.get_record(vif_rec["VM"])
vif_index = vif_rec["device"]
dom_id = vm_rec["domid"]
vif.succeed()
except XenAPI.Failure:
LOG.exception("Failure when looking up VMs or VIFs")
continue

View File

@@ -89,52 +89,49 @@ class ClientBase(object):
def __init__(self, use_master=False):
self._use_master = use_master
try:
if CONF.QUARK.redis_use_sentinels:
self._compile_sentinel_list()
self._ensure_connection_pool_exists()
self._ensure_connection_pools_exist()
self._client = self._client()
except redis.ConnectionError as e:
LOG.exception(e)
raise q_exc.RedisConnectionFailure()
def _prepare_pool_connection(self):
def _prepare_pool_connection(self, master):
LOG.info("Creating redis connection pool for the first time...")
host = CONF.QUARK.redis_host
port = CONF.QUARK.redis_port
connect_args = []
connect_kw = {}
if CONF.QUARK.redis_password:
connect_kw["password"] = CONF.QUARK.redis_password
connect_args = []
klass = redis.ConnectionPool
if CONF.QUARK.redis_use_sentinels:
connect_args.append(CONF.QUARK.redis_sentinel_master)
klass = redis.sentinel.SentinelConnectionPool
connect_args.append(
redis.sentinel.Sentinel(self._sentinel_list))
connect_kw["check_connection"] = True
connect_kw["is_master"] = self._use_master
LOG.info("Using redis sentinel connections %s" %
self._sentinel_list)
klass = redis.sentinel.SentinelConnectionPool
connect_args.append(CONF.QUARK.redis_sentinel_master)
connect_args.append(redis.sentinel.Sentinel(self._sentinel_list))
connect_kw["check_connection"] = True
connect_kw["is_master"] = master
else:
LOG.info("Using redis host %s:%s" % (host, port))
klass = redis.ConnectionPool
connect_kw["host"] = host
connect_kw["port"] = port
LOG.info("Using redis host %s:%s" % (host, port))
return klass, connect_args, connect_kw
def _ensure_connection_pool_exists(self):
if not self._use_master and not ClientBase.read_connection_pool:
klass, connect_args, connect_kw = self._prepare_pool_connection()
ClientBase.read_connection_pool = klass(*connect_args,
**connect_kw)
elif self._use_master and not ClientBase.write_connection_pool:
klass, connect_args, connect_kw = self._prepare_pool_connection()
ClientBase.write_connection_pool = klass(*connect_args,
**connect_kw)
def _ensure_connection_pools_exist(self):
if not (ClientBase.write_connection_pool or
ClientBase.read_connection_pool):
klass, args, kwargs = self._prepare_pool_connection(master=False)
ClientBase.read_connection_pool = klass(*args, **kwargs)
klass, args, kwargs = self._prepare_pool_connection(master=True)
ClientBase.write_connection_pool = klass(*args, **kwargs)
def _compile_sentinel_list(self):
self._sentinel_list = [tuple(host.split(':'))
@@ -144,10 +141,9 @@ class ClientBase(object):
"list of 'host:port' pairs")
def _client(self):
pool = ClientBase.read_connection_pool
if self._use_master:
pool = ClientBase.write_connection_pool
else:
pool = ClientBase.read_connection_pool
kwargs = {"connection_pool": pool,
"db": CONF.QUARK.redis_db,

View File

@@ -169,6 +169,12 @@ class SecurityGroupsClient(redis_base.ClientBase):
Returns a dictionary of xapi.VIFs with values of the current
acknowledged status in Redis.
States not explicitly handled:
* ack key, no rules - This is the same as just tagging the VIF,
the instance will be inaccessible
* rules key, no ack - Nothing will happen, the VIF will
not be tagged.
"""
LOG.debug("Getting security groups from Redis for {0}".format(
interfaces))
@@ -179,15 +185,15 @@ class SecurityGroupsClient(redis_base.ClientBase):
security_groups = self.get_fields(vif_keys, SECURITY_GROUP_ACK)
ret = {}
for vif, security_group in zip(interfaces, security_groups):
if security_group:
security_group = security_group.lower()
if "true" in security_group:
for vif, security_group_ack in zip(interfaces, security_groups):
if security_group_ack:
security_group_ack = security_group_ack.lower()
if "true" in security_group_ack:
ret[vif] = True
elif "false" in security_group:
elif "false" in security_group_ack:
ret[vif] = False
else:
LOG.debug("Skipping bad ack value %s" % security_group)
LOG.debug("Skipping bad ack value %s" % security_group_ack)
return ret
@utils.retry_loop(3)

View File

@@ -22,26 +22,31 @@ from quark.tests import test_base
class TestAgentPartitionVifs(test_base.TestBase):
@mock.patch("quark.agent.xapi.XapiClient._session")
@mock.patch("quark.agent.xapi.XapiClient.is_vif_tagged")
def test_partition_vifs(self, is_vif_tagged, sess):
interfaces = [xapi.VIF("added", 1, "added_ref"),
xapi.VIF("updated", 2, "updated_ref"),
xapi.VIF("removed", 3, "removed_ref"),
xapi.VIF("no groups", 4, "no groups ref"),
xapi.VIF("not found", 5, "not found ref"),
xapi.VIF("self heal", 6, "self heal ref")]
def test_partition_vifs(self, sess):
def _vif_rec(mac, tagged):
rec = {"MAC": mac, "other_config": {}}
if tagged:
rec["other_config"] = {"security_groups": "enabled"}
return rec
vif_recs = [_vif_rec(1, False), _vif_rec(2, True), _vif_rec(3, True),
_vif_rec(4, False), _vif_rec(5, False)]
interfaces = [xapi.VIF("added", vif_recs[0], "added_ref"),
xapi.VIF("updated", vif_recs[1], "updated_ref"),
xapi.VIF("removed", vif_recs[2], "removed_ref"),
xapi.VIF("no groups", vif_recs[3], "no groups ref"),
xapi.VIF("self heal", vif_recs[4], "self heal ref")]
sg_states = {interfaces[0]: False, interfaces[1]: False,
interfaces[5]: True}
interfaces[4]: True}
xapi_client = xapi.XapiClient()
is_vif_tagged.side_effect = [False, True, True, False, None,
False]
added, updated, removed = agent.partition_vifs(xapi_client,
interfaces,
sg_states)
self.assertEqual(added, [interfaces[0], interfaces[5]])
self.assertEqual(added, [interfaces[0], interfaces[4]])
self.assertEqual(updated, [interfaces[1]])
self.assertEqual(removed, [interfaces[2]])

View File

@@ -8,27 +8,28 @@ import mock
class TestVIF(test_base.TestBase):
def test_str(self):
self.assertEqual(str(xapi.VIF("1", "2", "3")), "1.2.3")
rec = {"MAC": 2}
self.assertEqual(str(xapi.VIF("1", rec, "3")), "1.2.3")
def test_repr(self):
self.assertEqual(repr(xapi.VIF("1", "2", "3")), "VIF('1', '2', '3')")
rec = {"MAC": '2'}
self.assertEqual(repr(xapi.VIF("1", rec, "3")), "VIF('1', '2', '3')")
def test_eq(self):
self.assertEqual(xapi.VIF("1", "2", "3"), xapi.VIF("1", "2", "3"))
rec = {"MAC": '2'}
self.assertEqual(xapi.VIF("1", rec, "3"), xapi.VIF("1", rec, "3"))
def test_ne(self):
self.assertNotEqual(xapi.VIF("1", "2", "4"), xapi.VIF("1", "3", "4"))
self.assertNotEqual(xapi.VIF("1", "2", "4"), xapi.VIF("3", "2", "4"))
self.assertNotEqual(xapi.VIF("1", "2", "4"), xapi.VIF("3", "4", "4"))
rec1 = {"MAC": '2'}
rec2 = {"MAC": '3'}
vif1 = xapi.VIF("1", rec1, "4")
vif2 = xapi.VIF("1", rec2, "4")
vif3 = xapi.VIF("3", rec1, "4")
vif4 = xapi.VIF("3", rec2, "4")
def test_hashable(self):
self.assertEqual(
tuple(set([xapi.VIF("1", "2", "3"), xapi.VIF("1", "2", "3")])),
(xapi.VIF("1", "2", "3"),))
def test_from_string(self):
self.assertEqual(xapi.VIF.from_string("1.2.3"),
xapi.VIF("1", "2", "3"))
self.assertNotEqual(vif1, vif2)
self.assertNotEqual(vif1, vif3)
self.assertNotEqual(vif1, vif4)
class TestXapiClient(test_base.TestBase):
@@ -70,21 +71,19 @@ class TestXapiClient(test_base.TestBase):
"opaque_vif2": {"VM": "opaque2", "MAC": "55:44:33:22:11:00"},
}
interfaces = self.xclient.get_interfaces()
self.assertEqual(interfaces,
set([xapi.VIF("device_id1", "00:11:22:33:44:55",
"opaque_vif1")]))
rec = {"MAC": "00:11:22:33:44:55", "VM": "opaque1"}
expected = set([xapi.VIF("device_id1", rec, "opaque_vif1")])
self.assertEqual(interfaces, expected)
@mock.patch("quark.agent.xapi.XapiClient.get_instances")
@mock.patch("quark.agent.xapi.XapiClient.is_vif_tagged")
def test_update_interfaces_added(self, record_exist, get_instances):
record_exist.side_effect = [False, True]
def test_update_interfaces_added(self, get_instances):
instances = {"opaque1": xapi.VM(uuid="device_id1",
ref="opaque1",
vifs=["opaque_vif1"],
dom_id="1")}
get_instances.return_value = instances
interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55",
rec = {"MAC": "00:11:22:33:44:55"}
interfaces = [xapi.VIF("device_id1", rec,
"opaque_vif1")]
dom_id = "1"
@@ -108,10 +107,9 @@ class TestXapiClient(test_base.TestBase):
"neutron_vif_flow", "online_instance_flows",
expected_args)
@mock.patch("quark.agent.xapi.XapiClient.is_vif_tagged")
def test_update_interfaces_added_vm_removed(self, record_exist):
record_exist.side_effect = [False, True]
interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55",
def test_update_interfaces_added_vm_removed(self):
rec = {"MAC": "00:11:22:33:44:55"}
interfaces = [xapi.VIF("device_id1", rec,
"opaque_vif1")]
vif_index = "0"
@@ -131,7 +129,8 @@ class TestXapiClient(test_base.TestBase):
self.assertEqual(self.session.xenapi.host.call_plugin.call_count, 0)
def test_update_interfaces_updated(self):
interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55",
rec = {"MAC": "00:11:22:33:44:55"}
interfaces = [xapi.VIF("device_id1", rec,
"opaque_vif1")]
dom_id = "1"
@@ -157,7 +156,8 @@ class TestXapiClient(test_base.TestBase):
expected_args)
def test_update_interfaces_removed(self):
interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55",
rec = {"MAC": "00:11:22:33:44:55"}
interfaces = [xapi.VIF("device_id1", rec,
"opaque_vif1")]
dom_id = "1"
@@ -182,7 +182,8 @@ class TestXapiClient(test_base.TestBase):
expected_args)
def test_update_interfaces_removed_vm_removed(self):
interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55",
rec = {"MAC": "00:11:22:33:44:55"}
interfaces = [xapi.VIF("device_id1", rec,
"opaque_vif1")]
self.session.xenapi.VIF.get_record.side_effect = XenAPI.Failure(
@@ -207,22 +208,9 @@ class TestXapiClient(test_base.TestBase):
self.assertEqual(self.session.xenapi.host.call_plugin.call_count, 0)
def test_is_vif_tagged(self):
self.session.xenapi.VIF.get_record.return_value = {
'other_config': {},
'uuid': '2d865be2-f626-d89f-6c91-7bd8fb521a3a'
}
self.assertFalse(self.xclient.is_vif_tagged(self.session, "vif"))
def test_is_vif_tagged_record(self):
self.session.xenapi.VIF.get_record.return_value = {
'other_config': {"security_groups": {"enabled": True}},
'uuid': '2d865be2-f626-d89f-6c91-7bd8fb521a3a'
}
self.assertTrue(self.xclient.is_vif_tagged(self.session, "vif"))
def test_update_interfaces_removed_raises(self):
interfaces = [xapi.VIF("device_id1", "00:11:22:33:44:55",
rec = {"MAC": "00:11:22:33:44:55"}
interfaces = [xapi.VIF("device_id1", rec,
"opaque_vif1")]
dom_id = "1"

View File

@@ -54,29 +54,6 @@ class TestClientBase(test_base.TestBase):
port = 6379
redis_base.ClientBase()
conn_pool.assert_called_with(host=host, port=port)
self.assertIsNotNone(redis_base.ClientBase.read_connection_pool)
self.assertIsNone(redis_base.ClientBase.write_connection_pool)
@mock.patch("redis.ConnectionPool")
@mock.patch("quark.cache.redis_base.redis.StrictRedis")
def test_init_master(self, strict_redis, conn_pool):
host = "127.0.0.1"
port = 6379
redis_base.ClientBase(use_master=True)
conn_pool.assert_called_with(host=host, port=port)
self.assertIsNone(redis_base.ClientBase.read_connection_pool)
self.assertIsNotNone(redis_base.ClientBase.write_connection_pool)
@mock.patch("redis.ConnectionPool")
@mock.patch("quark.cache.redis_base.redis.StrictRedis")
def test_init_both(self, strict_redis, conn_pool):
host = "127.0.0.1"
port = 6379
redis_base.ClientBase()
redis_base.ClientBase(use_master=True)
conn_pool.assert_called_with(host=host, port=port)
self.assertIsNotNone(redis_base.ClientBase.read_connection_pool)
self.assertIsNotNone(redis_base.ClientBase.write_connection_pool)

View File

@@ -310,8 +310,10 @@ class TestRedisForAgent(test_base.TestBase):
'{"%s": True}' % sg_client.SECURITY_GROUP_ACK,
'{"%s": "1-2-3"}' % sg_client.SECURITY_GROUP_ACK]
new_interfaces = ([VIF(1, 2, 9), VIF(3, 4, 0), VIF(5, 6, 1),
VIF(7, 8, 2), VIF(9, 0, 3)])
recs = [{"MAC": 2}, {"MAC": 4}, {"MAC": 6}, {"MAC": 8}, {"MAC": 0}]
new_interfaces = ([VIF(1, recs[0], 9), VIF(3, recs[1], 0),
VIF(5, recs[2], 1), VIF(7, recs[3], 2),
VIF(9, recs[4], 3)])
group_states = rc.get_security_group_states(new_interfaces)
@@ -319,5 +321,6 @@ class TestRedisForAgent(test_base.TestBase):
["1.000000000002", "3.000000000004", "5.000000000006",
"7.000000000008", "9.000000000000"],
sg_client.SECURITY_GROUP_ACK)
self.assertEqual(group_states, {VIF(5, 6, 1): False,
VIF(7, 8, 2): True})
self.assertEqual(group_states, {new_interfaces[2]: False,
new_interfaces[3]: True})