Add server security groups to shade

We need a way to add and remove security groups to a server in shade.
This will let us modify them from ansible.

Change-Id: I5602e44720485e20491cf4930605362312274b20
This commit is contained in:
Jamie Lennox 2017-03-29 11:11:59 +11:00
parent 7333bc7ae4
commit 88d8a37cfa
3 changed files with 276 additions and 0 deletions

View File

@ -0,0 +1,4 @@
---
features:
- Add the `add_server_security_groups` and `remove_server_security_groups`
functions to add and remove security groups from a specific server.

View File

@ -1812,6 +1812,97 @@ class OpenStackCloud(_normalize.Normalizer):
return self._normalize_secgroups(groups) return self._normalize_secgroups(groups)
def _get_server_security_groups(self, server, security_groups):
if not self._has_secgroups():
raise OpenStackCloudUnavailableFeature(
"Unavailable feature: security groups"
)
if not isinstance(server, dict):
server = self.get_server(server, bare=True)
if server is None:
self.log.debug('Server %s not found', server)
return None, None
if not isinstance(security_groups, (list, tuple)):
security_groups = [security_groups]
sec_group_objs = []
for sg in security_groups:
if not isinstance(sg, dict):
sg = self.get_security_group(sg)
if sg is None:
self.log.debug('Security group %s not found for adding',
sg)
return None, None
sec_group_objs.append(sg)
return server, sec_group_objs
def add_server_security_groups(self, server, security_groups):
"""Add security groups to a server.
Add existing security groups to an existing server. If the security
groups are already present on the server this will continue unaffected.
:returns: False if server or security groups are undefined, True
otherwise.
:raises: ``OpenStackCloudException``, on operation error.
"""
server, security_groups = self._get_server_security_groups(
server, security_groups)
if not (server and security_groups):
return False
for sg in security_groups:
self._compute_client.post(
'/servers/%s/action' % server.id,
json={'addSecurityGroup': {'name': sg.name}})
return True
def remove_server_security_groups(self, server, security_groups):
"""Remove security groups from a server
Remove existing security groups from an existing server. If the
security groups are not present on the server this will continue
unaffected.
:returns: False if server or security groups are undefined, True
otherwise.
:raises: ``OpenStackCloudException``, on operation error.
"""
server, security_groups = self._get_server_security_groups(
server, security_groups)
if not (server and security_groups):
return False
for sg in security_groups:
try:
self._compute_client.post(
'/servers/%s/action' % server.id,
json={'removeSecurityGroup': {'name': sg.name}})
except OpenStackCloudURINotFound as e:
# NOTE(jamielennox): Is this ok? If we remove something that
# isn't present should we just conclude job done or is that an
# error? Nova returns ok if you try to add a group twice.
self.log.debug(
"The security group %s was not present on server %s so "
"no action was performed", sg.name, server.name)
return True
def list_security_groups(self, filters=None): def list_security_groups(self, filters=None):
"""List all available security groups. """List all available security groups.

View File

@ -372,3 +372,184 @@ class TestSecurityGroups(base.TestCase):
ret = self.cloud.list_server_security_groups(server) ret = self.cloud.list_server_security_groups(server)
self.assertEqual([], ret) self.assertEqual([], ret)
self.assertFalse(mock_nova.servers.list_security_group.called) self.assertFalse(mock_nova.servers.list_security_group.called)
class TestServerSecurityGroups(base.RequestsMockTestCase):
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
def test_add_security_group_to_server_nova(self, mock_nova):
# fake to get server by name, server-name must match
fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE')
mock_nova.servers.list.return_value = [fake_server]
# use nova for secgroup list and return an existing fake
self.has_neutron = False
self.cloud.secgroup_source = 'nova'
mock_nova.security_groups.list.return_value = [nova_grp_obj]
self.register_uris([
dict(
method='POST',
uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'),
validate={'addSecurityGroup': {'name': 'nova-sec-group'}},
status_code=202,
),
])
ret = self.cloud.add_server_security_groups('server-name',
'nova-sec-group')
self.assertTrue(ret)
self.assertTrue(mock_nova.servers.list.called_once)
self.assertTrue(mock_nova.security_groups.list.called_once)
self.assert_calls()
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
@mock.patch.object(shade.OpenStackCloud, 'neutron_client')
def test_add_security_group_to_server_neutron(self,
mock_neutron,
mock_nova):
# fake to get server by name, server-name must match
fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE')
mock_nova.servers.list.return_value = [fake_server]
# use neutron for secgroup list and return an existing fake
self.cloud.secgroup_source = 'neutron'
neutron_return = dict(security_groups=[neutron_grp_dict])
mock_neutron.list_security_groups.return_value = neutron_return
self.register_uris([
dict(
method='POST',
uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'),
validate={'addSecurityGroup': {'name': 'neutron-sec-group'}},
status_code=202,
),
])
ret = self.cloud.add_server_security_groups('server-name',
'neutron-sec-group')
self.assertTrue(ret)
self.assertTrue(mock_nova.servers.list.called_once)
self.assertTrue(mock_neutron.list_securit_groups.called_once)
self.assert_calls()
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
def test_remove_security_group_from_server_nova(self, mock_nova):
# fake to get server by name, server-name must match
fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE')
mock_nova.servers.list.return_value = [fake_server]
# use nova for secgroup list and return an existing fake
self.has_neutron = False
self.cloud.secgroup_source = 'nova'
mock_nova.security_groups.list.return_value = [nova_grp_obj]
self.register_uris([
dict(
method='POST',
uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'),
validate={'removeSecurityGroup': {'name': 'nova-sec-group'}},
),
])
ret = self.cloud.remove_server_security_groups('server-name',
'nova-sec-group')
self.assertTrue(ret)
self.assertTrue(mock_nova.servers.list.called_once)
self.assertTrue(mock_nova.security_groups.list.called_once)
self.assert_calls()
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
@mock.patch.object(shade.OpenStackCloud, 'neutron_client')
def test_remove_security_group_from_server_neutron(self,
mock_neutron,
mock_nova):
# fake to get server by name, server-name must match
fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE')
mock_nova.servers.list.return_value = [fake_server]
# use neutron for secgroup list and return an existing fake
self.cloud.secgroup_source = 'neutron'
neutron_return = dict(security_groups=[neutron_grp_dict])
mock_neutron.list_security_groups.return_value = neutron_return
validate = {'removeSecurityGroup': {'name': 'neutron-sec-group'}}
self.register_uris([
dict(
method='POST',
uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'),
validate=validate,
),
])
ret = self.cloud.remove_server_security_groups('server-name',
'neutron-sec-group')
self.assertTrue(ret)
self.assertTrue(mock_nova.servers.list.called_once)
self.assertTrue(mock_neutron.list_security_groups.called_once)
self.assert_calls()
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
def test_add_bad_security_group_to_server_nova(self, mock_nova):
# fake to get server by name, server-name must match
fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE')
mock_nova.servers.list.return_value = [fake_server]
# use nova for secgroup list and return an existing fake
self.has_neutron = False
self.cloud.secgroup_source = 'nova'
mock_nova.security_groups.list.return_value = [nova_grp_obj]
ret = self.cloud.add_server_security_groups('server-name',
'unknown-sec-group')
self.assertFalse(ret)
self.assertTrue(mock_nova.servers.list.called_once)
self.assertTrue(mock_nova.security_groups.list.called_once)
self.assert_calls()
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
@mock.patch.object(shade.OpenStackCloud, 'neutron_client')
def test_add_bad_security_group_to_server_neutron(self,
mock_neutron,
mock_nova):
# fake to get server by name, server-name must match
fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE')
mock_nova.servers.list.return_value = [fake_server]
# use neutron for secgroup list and return an existing fake
self.cloud.secgroup_source = 'neutron'
neutron_return = dict(security_groups=[neutron_grp_dict])
mock_neutron.list_security_groups.return_value = neutron_return
ret = self.cloud.add_server_security_groups('server-name',
'unknown-sec-group')
self.assertFalse(ret)
self.assertTrue(mock_nova.servers.list.called_once)
self.assertTrue(mock_neutron.list_security_groups.called_once)
self.assert_calls()
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
def test_add_security_group_to_bad_server(self, mock_nova):
# fake to get server by name, server-name must match
fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE')
mock_nova.servers.list.return_value = [fake_server]
ret = self.cloud.add_server_security_groups('unknown-server-name',
'nova-sec-group')
self.assertFalse(ret)
self.assertTrue(mock_nova.servers.list.called_once)
self.assert_calls()