manila/manila/tests/share/drivers/container/test_security_service_helper.py
Eduardo Santos 733d6218e6 Add security service update support to the container driver
This implementation adds the functionality to add/update security services
to in use share networks using the container driver. The container driver will
also try to setup security services while creating share servers. Currently, the
only supported security service type is LDAP.

Co-Authored-By: Carlos Eduardo <ces.eduardo98@gmail.com>
Partially Implements: bp add-security-service-in-use-share-networks
Depends-On: I129a794dfd2d179fa2b9a2fed050459d6f00b0de

Change-Id: Ifb8b9ebe6eb0661844c794ca1a32e35105652f72
2021-03-12 18:43:53 -03:00

174 lines
6.3 KiB
Python

# Copyright (c) 2021 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the Security Service helper module."""
from unittest import mock
import ddt
from manila import exception
from manila.share import configuration
from manila.share.drivers.container import security_service_helper
from manila import test
from manila.tests import db_utils
INVALID_CREDENTIALS_EXIT_CODE = 49
@ddt.ddt
class SecurityServiceHelperTestCase(test.TestCase):
"""Tests DockerExecHelper"""
def setUp(self):
super(SecurityServiceHelperTestCase, self).setUp()
self.fake_conf = configuration.Configuration(None)
self.fake_conf.container_image_name = "fake_image"
self.fake_conf.container_volume_mount_path = "/tmp/shares"
self.security_service_helper = (
security_service_helper.SecurityServiceHelper(
configuration=self.fake_conf))
def test_setup_security_service(self):
share_server = db_utils.create_share_server()
security_service = db_utils.create_security_service()
mock_ldap_bind = self.mock_object(
self.security_service_helper, 'ldap_bind')
self.security_service_helper.setup_security_service(
share_server['id'], security_service)
mock_ldap_bind.assert_called_once_with(
share_server['id'], security_service)
def test_update_security_service(self):
share_server = db_utils.create_share_server()
current_security_service = db_utils.create_security_service()
new_security_service = db_utils.create_security_service()
mock_ldap_bind = self.mock_object(
self.security_service_helper, 'ldap_bind')
self.security_service_helper.update_security_service(
share_server['id'], current_security_service, new_security_service)
mock_ldap_bind.assert_called_once_with(
share_server['id'], new_security_service)
def _setup_test_ldap_bind_tests(self):
share_server = db_utils.create_security_service()
security_service = db_utils.create_security_service()
ldap_get_info = {
'ss_password': security_service['password'],
'ss_user': security_service['user']
}
expected_cmd = [
"docker", "exec", "%s" % share_server['id'], "ldapwhoami", "-x",
"-H", "ldap://localhost:389", "-D",
"cn=%s,dc=example,dc=com" % ldap_get_info[
"ss_user"],
"-w", "%s" % ldap_get_info["ss_password"]]
return share_server, security_service, ldap_get_info, expected_cmd
def test_ldap_bind(self):
share_server, security_service, ldap_get_info, expected_cmd = (
self._setup_test_ldap_bind_tests())
mock_ldap_get_info = self.mock_object(
self.security_service_helper, 'ldap_get_info',
mock.Mock(return_value=ldap_get_info))
mock_ldap_retry_operation = self.mock_object(
self.security_service_helper, 'ldap_retry_operation')
self.security_service_helper.ldap_bind(
share_server['id'], security_service)
mock_ldap_get_info.assert_called_once_with(security_service)
mock_ldap_retry_operation.assert_called_once_with(expected_cmd,
run_as_root=True)
def test_ldap_get_info(self):
security_service = db_utils.create_security_service()
expected_ldap_get_info = {
'ss_password': security_service['password'],
'ss_user': security_service['user']
}
ldap_get_info = self.security_service_helper.ldap_get_info(
security_service)
self.assertEqual(expected_ldap_get_info, ldap_get_info)
@ddt.data(
{'type': 'ldap'},
{'user': 'fake_user'},
{'password': 'fake_password'},
)
def test_ldap_get_info_exception(self, sec_service_data):
self.assertRaises(
exception.ShareBackendException,
self.security_service_helper.ldap_get_info,
sec_service_data
)
def test_ldap_retry_operation(self):
mock_cmd = ["command", "to", "be", "executed"]
mock_execute = self.mock_object(self.security_service_helper,
'_execute')
self.security_service_helper.ldap_retry_operation(mock_cmd,
run_as_root=True)
mock_execute.assert_called_once_with(*mock_cmd, run_as_root=True)
def test_ldap_retry_operation_timeout(self):
mock_cmd = ["command", "to", "be", "executed"]
mock_execute = self.mock_object(
self.security_service_helper, '_execute',
mock.Mock(
side_effect=exception.ProcessExecutionError(exit_code=1)))
self.assertRaises(
exception.ShareBackendException,
self.security_service_helper.ldap_retry_operation,
mock_cmd,
run_as_root=False,
timeout=10)
mock_execute.assert_has_calls([
mock.call(*mock_cmd, run_as_root=False),
mock.call(*mock_cmd, run_as_root=False)])
def test_ldap_retry_operation_invalid_credential(self):
mock_cmd = ["command", "to", "be", "executed"]
mock_execute = self.mock_object(
self.security_service_helper, '_execute',
mock.Mock(
side_effect=exception.ProcessExecutionError(
exit_code=49)))
self.assertRaises(
exception.ShareBackendException,
self.security_service_helper.ldap_retry_operation,
mock_cmd,
run_as_root=False)
mock_execute.assert_called_once_with(*mock_cmd, run_as_root=False)