Merge "Make security_group_rule_get_by_security_group() honor columns"

This commit is contained in:
Jenkins 2013-10-29 11:38:48 +00:00 committed by Gerrit Code Review
commit f9e59efe87
3 changed files with 39 additions and 18 deletions

View File

@ -1238,10 +1238,11 @@ def security_group_rule_create(context, values):
return IMPL.security_group_rule_create(context, values)
def security_group_rule_get_by_security_group(context, security_group_id):
def security_group_rule_get_by_security_group(context, security_group_id,
columns_to_join=None):
"""Get all rules for a given security group."""
return IMPL.security_group_rule_get_by_security_group(context,
security_group_id)
return IMPL.security_group_rule_get_by_security_group(
context, security_group_id, columns_to_join=columns_to_join)
def security_group_rule_get_by_security_group_grantee(context,

View File

@ -3829,14 +3829,16 @@ def security_group_rule_get(context, security_group_rule_id):
@require_context
def security_group_rule_get_by_security_group(context, security_group_id):
return (_security_group_rule_get_query(context).
filter_by(parent_group_id=security_group_id).
options(joinedload_all('grantee_group.instances.'
'system_metadata')).
options(joinedload('grantee_group.instances.'
'info_cache')).
all())
def security_group_rule_get_by_security_group(context, security_group_id,
columns_to_join=None):
if columns_to_join is None:
columns_to_join = ['grantee_group.instances.system_metadata',
'grantee_group.instances.info_cache']
query = (_security_group_rule_get_query(context).
filter_by(parent_group_id=security_group_id))
for column in columns_to_join:
query = query.options(joinedload_all(column))
return query.all()
@require_context

View File

@ -1027,18 +1027,36 @@ class SecurityGroupRuleTestCase(test.TestCase, ModelsObjectComparatorMixin):
for key, value in self._get_base_rule_values().items():
self.assertEqual(value, security_group_rule[key])
def test_security_group_rule_get_by_security_group(self):
security_group = self._create_security_group({})
def _test_security_group_rule_get_by_security_group(self, columns=None):
instance = db.instance_create(self.ctxt,
{'system_metadata': {'foo': 'bar'}})
security_group = self._create_security_group({
'instances': [instance]})
security_group_rule = self._create_security_group_rule(
{'parent_group': security_group})
{'parent_group': security_group, 'grantee_group': security_group})
security_group_rule1 = self._create_security_group_rule(
{'parent_group': security_group})
found_rules = db.security_group_rule_get_by_security_group(self.ctxt,
security_group['id'])
{'parent_group': security_group, 'grantee_group': security_group})
found_rules = db.security_group_rule_get_by_security_group(
self.ctxt, security_group['id'], columns_to_join=columns)
self.assertEqual(len(found_rules), 2)
rules_ids = [security_group_rule['id'], security_group_rule1['id']]
for rule in found_rules:
self.assertIn(rule['id'], rules_ids)
if columns is None:
self.assertIn('grantee_group', dict(rule.iteritems()))
self.assertIn('instances',
dict(rule.grantee_group.iteritems()))
self.assertIn(
'system_metadata',
dict(rule.grantee_group.instances[0].iteritems()))
self.assertIn(rule['id'], rules_ids)
else:
self.assertNotIn('grantee_group', dict(rule.iteritems()))
def test_security_group_rule_get_by_security_group(self):
self._test_security_group_rule_get_by_security_group()
def test_security_group_rule_get_by_security_group_no_joins(self):
self._test_security_group_rule_get_by_security_group(columns=[])
def test_security_group_rule_get_by_security_group_grantee(self):
security_group = self._create_security_group({})