diff --git a/releasenotes/notes/server-security-groups-840ab28c04f359de.yaml b/releasenotes/notes/server-security-groups-840ab28c04f359de.yaml new file mode 100644 index 000000000..d9de793e9 --- /dev/null +++ b/releasenotes/notes/server-security-groups-840ab28c04f359de.yaml @@ -0,0 +1,4 @@ +--- +features: + - Add the `add_server_security_groups` and `remove_server_security_groups` + functions to add and remove security groups from a specific server. diff --git a/shade/openstackcloud.py b/shade/openstackcloud.py index ed4f2f8ba..ec2dc085f 100644 --- a/shade/openstackcloud.py +++ b/shade/openstackcloud.py @@ -1812,6 +1812,97 @@ class OpenStackCloud(_normalize.Normalizer): return self._normalize_secgroups(groups) + def _get_server_security_groups(self, server, security_groups): + if not self._has_secgroups(): + raise OpenStackCloudUnavailableFeature( + "Unavailable feature: security groups" + ) + + if not isinstance(server, dict): + server = self.get_server(server, bare=True) + + if server is None: + self.log.debug('Server %s not found', server) + return None, None + + if not isinstance(security_groups, (list, tuple)): + security_groups = [security_groups] + + sec_group_objs = [] + + for sg in security_groups: + if not isinstance(sg, dict): + sg = self.get_security_group(sg) + + if sg is None: + self.log.debug('Security group %s not found for adding', + sg) + + return None, None + + sec_group_objs.append(sg) + + return server, sec_group_objs + + def add_server_security_groups(self, server, security_groups): + """Add security groups to a server. + + Add existing security groups to an existing server. If the security + groups are already present on the server this will continue unaffected. + + :returns: False if server or security groups are undefined, True + otherwise. + + :raises: ``OpenStackCloudException``, on operation error. + """ + server, security_groups = self._get_server_security_groups( + server, security_groups) + + if not (server and security_groups): + return False + + for sg in security_groups: + self._compute_client.post( + '/servers/%s/action' % server.id, + json={'addSecurityGroup': {'name': sg.name}}) + + return True + + def remove_server_security_groups(self, server, security_groups): + """Remove security groups from a server + + Remove existing security groups from an existing server. If the + security groups are not present on the server this will continue + unaffected. + + :returns: False if server or security groups are undefined, True + otherwise. + + :raises: ``OpenStackCloudException``, on operation error. + """ + server, security_groups = self._get_server_security_groups( + server, security_groups) + + if not (server and security_groups): + return False + + for sg in security_groups: + try: + self._compute_client.post( + '/servers/%s/action' % server.id, + json={'removeSecurityGroup': {'name': sg.name}}) + + except OpenStackCloudURINotFound as e: + # NOTE(jamielennox): Is this ok? If we remove something that + # isn't present should we just conclude job done or is that an + # error? Nova returns ok if you try to add a group twice. + self.log.debug( + "The security group %s was not present on server %s so " + "no action was performed", sg.name, server.name) + + return True + + def list_security_groups(self, filters=None): """List all available security groups. diff --git a/shade/tests/unit/test_security_groups.py b/shade/tests/unit/test_security_groups.py index 026096ee7..5ce6765d0 100644 --- a/shade/tests/unit/test_security_groups.py +++ b/shade/tests/unit/test_security_groups.py @@ -372,3 +372,184 @@ class TestSecurityGroups(base.TestCase): ret = self.cloud.list_server_security_groups(server) self.assertEqual([], ret) self.assertFalse(mock_nova.servers.list_security_group.called) + + +class TestServerSecurityGroups(base.RequestsMockTestCase): + + @mock.patch.object(shade.OpenStackCloud, 'nova_client') + def test_add_security_group_to_server_nova(self, mock_nova): + # fake to get server by name, server-name must match + fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE') + mock_nova.servers.list.return_value = [fake_server] + + # use nova for secgroup list and return an existing fake + self.has_neutron = False + self.cloud.secgroup_source = 'nova' + mock_nova.security_groups.list.return_value = [nova_grp_obj] + + self.register_uris([ + dict( + method='POST', + uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'), + validate={'addSecurityGroup': {'name': 'nova-sec-group'}}, + status_code=202, + ), + ]) + + ret = self.cloud.add_server_security_groups('server-name', + 'nova-sec-group') + self.assertTrue(ret) + + self.assertTrue(mock_nova.servers.list.called_once) + self.assertTrue(mock_nova.security_groups.list.called_once) + + self.assert_calls() + + @mock.patch.object(shade.OpenStackCloud, 'nova_client') + @mock.patch.object(shade.OpenStackCloud, 'neutron_client') + def test_add_security_group_to_server_neutron(self, + mock_neutron, + mock_nova): + # fake to get server by name, server-name must match + fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE') + mock_nova.servers.list.return_value = [fake_server] + + # use neutron for secgroup list and return an existing fake + self.cloud.secgroup_source = 'neutron' + neutron_return = dict(security_groups=[neutron_grp_dict]) + mock_neutron.list_security_groups.return_value = neutron_return + + self.register_uris([ + dict( + method='POST', + uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'), + validate={'addSecurityGroup': {'name': 'neutron-sec-group'}}, + status_code=202, + ), + ]) + + ret = self.cloud.add_server_security_groups('server-name', + 'neutron-sec-group') + self.assertTrue(ret) + + self.assertTrue(mock_nova.servers.list.called_once) + self.assertTrue(mock_neutron.list_securit_groups.called_once) + + self.assert_calls() + + @mock.patch.object(shade.OpenStackCloud, 'nova_client') + def test_remove_security_group_from_server_nova(self, mock_nova): + # fake to get server by name, server-name must match + fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE') + mock_nova.servers.list.return_value = [fake_server] + + # use nova for secgroup list and return an existing fake + self.has_neutron = False + self.cloud.secgroup_source = 'nova' + mock_nova.security_groups.list.return_value = [nova_grp_obj] + + self.register_uris([ + dict( + method='POST', + uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'), + validate={'removeSecurityGroup': {'name': 'nova-sec-group'}}, + ), + ]) + + ret = self.cloud.remove_server_security_groups('server-name', + 'nova-sec-group') + self.assertTrue(ret) + + self.assertTrue(mock_nova.servers.list.called_once) + self.assertTrue(mock_nova.security_groups.list.called_once) + + self.assert_calls() + + @mock.patch.object(shade.OpenStackCloud, 'nova_client') + @mock.patch.object(shade.OpenStackCloud, 'neutron_client') + def test_remove_security_group_from_server_neutron(self, + mock_neutron, + mock_nova): + # fake to get server by name, server-name must match + fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE') + mock_nova.servers.list.return_value = [fake_server] + + # use neutron for secgroup list and return an existing fake + self.cloud.secgroup_source = 'neutron' + neutron_return = dict(security_groups=[neutron_grp_dict]) + mock_neutron.list_security_groups.return_value = neutron_return + + validate = {'removeSecurityGroup': {'name': 'neutron-sec-group'}} + self.register_uris([ + dict( + method='POST', + uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'), + validate=validate, + ), + ]) + + ret = self.cloud.remove_server_security_groups('server-name', + 'neutron-sec-group') + self.assertTrue(ret) + + self.assertTrue(mock_nova.servers.list.called_once) + self.assertTrue(mock_neutron.list_security_groups.called_once) + + self.assert_calls() + + @mock.patch.object(shade.OpenStackCloud, 'nova_client') + def test_add_bad_security_group_to_server_nova(self, mock_nova): + # fake to get server by name, server-name must match + fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE') + mock_nova.servers.list.return_value = [fake_server] + + # use nova for secgroup list and return an existing fake + self.has_neutron = False + self.cloud.secgroup_source = 'nova' + mock_nova.security_groups.list.return_value = [nova_grp_obj] + + ret = self.cloud.add_server_security_groups('server-name', + 'unknown-sec-group') + self.assertFalse(ret) + + self.assertTrue(mock_nova.servers.list.called_once) + self.assertTrue(mock_nova.security_groups.list.called_once) + + self.assert_calls() + + @mock.patch.object(shade.OpenStackCloud, 'nova_client') + @mock.patch.object(shade.OpenStackCloud, 'neutron_client') + def test_add_bad_security_group_to_server_neutron(self, + mock_neutron, + mock_nova): + # fake to get server by name, server-name must match + fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE') + mock_nova.servers.list.return_value = [fake_server] + + # use neutron for secgroup list and return an existing fake + self.cloud.secgroup_source = 'neutron' + neutron_return = dict(security_groups=[neutron_grp_dict]) + mock_neutron.list_security_groups.return_value = neutron_return + + ret = self.cloud.add_server_security_groups('server-name', + 'unknown-sec-group') + self.assertFalse(ret) + + self.assertTrue(mock_nova.servers.list.called_once) + self.assertTrue(mock_neutron.list_security_groups.called_once) + + self.assert_calls() + + @mock.patch.object(shade.OpenStackCloud, 'nova_client') + def test_add_security_group_to_bad_server(self, mock_nova): + # fake to get server by name, server-name must match + fake_server = fakes.FakeServer('1234', 'server-name', 'ACTIVE') + mock_nova.servers.list.return_value = [fake_server] + + ret = self.cloud.add_server_security_groups('unknown-server-name', + 'nova-sec-group') + self.assertFalse(ret) + + self.assertTrue(mock_nova.servers.list.called_once) + + self.assert_calls()