Improve unit test coverage for dcmanager's APIs (system_peers)
Improves unit test coverage for dcmanager's system_peers API from 68% to 98%. Test plan: All of the tests were created taking into account the output of 'tox -c tox.ini -e cover' command Story: 2007082 Task: 49682 Change-Id: Iec150e1df79c48e1afddebe612dbd27c3e742685 Signed-off-by: rlima <Raphael.Lima@windriver.com>
This commit is contained in:
parent
df3a6a0911
commit
b0bef0dbe8
@ -1,316 +0,0 @@
|
||||
# Copyright (c) 2023-2024 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from six.moves import http_client
|
||||
|
||||
from dcmanager.db.sqlalchemy import api as db_api
|
||||
from dcmanager.rpc import client as rpc_client
|
||||
from dcmanager.tests.unit.api import test_root_controller as testroot
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import APIMixin
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import DeleteMixin
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import GetMixin
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import PostJSONMixin
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import UpdateMixin
|
||||
from dcmanager.tests import utils
|
||||
|
||||
SAMPLE_SYSTEM_PEER_UUID = str(uuid.uuid4())
|
||||
SAMPLE_SYSTEM_PEER_NAME = 'SystemPeer1'
|
||||
SAMPLE_MANAGER_ENDPOINT = 'http://127.0.0.1:5000'
|
||||
SAMPLE_MANAGER_USERNAME = 'admin'
|
||||
SAMPLE_MANAGER_PASSWORD = 'password'
|
||||
SAMPLE_ADMINISTRATIVE_STATE = 'enabled'
|
||||
SAMPLE_HEARTBEAT_INTERVAL = 10
|
||||
SAMPLE_HEARTBEAT_FAILURE_THRESHOLD = 3
|
||||
SAMPLE_HEARTBEAT_FAILURES_POLICY = 'alarm'
|
||||
SAMPLE_HEARTBEAT_MAINTENANCE_TIMEOUT = 600
|
||||
SAMPLE_PEER_CONTROLLER_GATEWAY_IP = '128.128.128.1'
|
||||
|
||||
|
||||
class SystemPeerAPIMixin(APIMixin):
|
||||
|
||||
API_PREFIX = '/v1.0/system-peers'
|
||||
RESULT_KEY = 'system_peers'
|
||||
EXPECTED_FIELDS = ['id',
|
||||
'peer-uuid',
|
||||
'peer-name',
|
||||
'manager-endpoint',
|
||||
'manager-username',
|
||||
'peer-controller-gateway-address',
|
||||
'administrative-state',
|
||||
'heartbeat-interval',
|
||||
'heartbeat-failure-threshold',
|
||||
'heartbeat-failure-policy',
|
||||
'heartbeat-maintenance-timeout',
|
||||
'created-at',
|
||||
'updated-at']
|
||||
|
||||
def setUp(self):
|
||||
super(SystemPeerAPIMixin, self).setUp()
|
||||
self.fake_rpc_client.some_method = mock.MagicMock()
|
||||
|
||||
def _get_test_system_peer_dict(self, data_type, **kw):
|
||||
# id should not be part of the structure
|
||||
system_peer = {
|
||||
'peer_uuid': kw.get('peer_uuid', SAMPLE_SYSTEM_PEER_UUID),
|
||||
'peer_name': kw.get('peer_name', SAMPLE_SYSTEM_PEER_NAME),
|
||||
'administrative_state': kw.get('administrative_state',
|
||||
SAMPLE_ADMINISTRATIVE_STATE),
|
||||
'heartbeat_interval': kw.get('heartbeat_interval',
|
||||
SAMPLE_HEARTBEAT_INTERVAL),
|
||||
'heartbeat_failure_threshold': kw.get(
|
||||
'heartbeat_failure_threshold', SAMPLE_HEARTBEAT_FAILURE_THRESHOLD),
|
||||
'heartbeat_failure_policy': kw.get(
|
||||
'heartbeat_failure_policy', SAMPLE_HEARTBEAT_FAILURES_POLICY),
|
||||
'heartbeat_maintenance_timeout': kw.get(
|
||||
'heartbeat_maintenance_timeout',
|
||||
SAMPLE_HEARTBEAT_MAINTENANCE_TIMEOUT)
|
||||
}
|
||||
|
||||
if data_type == 'db':
|
||||
system_peer['endpoint'] = kw.get('manager_endpoint',
|
||||
SAMPLE_MANAGER_ENDPOINT)
|
||||
system_peer['username'] = kw.get('manager_username',
|
||||
SAMPLE_MANAGER_USERNAME)
|
||||
system_peer['password'] = kw.get('manager_password',
|
||||
SAMPLE_MANAGER_PASSWORD)
|
||||
system_peer['gateway_ip'] = kw.get(
|
||||
'peer_controller_gateway_ip', SAMPLE_PEER_CONTROLLER_GATEWAY_IP)
|
||||
else:
|
||||
system_peer['manager_endpoint'] = kw.get('manager_endpoint',
|
||||
SAMPLE_MANAGER_ENDPOINT)
|
||||
system_peer['manager_username'] = kw.get('manager_username',
|
||||
SAMPLE_MANAGER_USERNAME)
|
||||
system_peer['manager_password'] = kw.get('manager_password',
|
||||
SAMPLE_MANAGER_PASSWORD)
|
||||
system_peer['peer_controller_gateway_address'] = kw.get(
|
||||
'peer_controller_gateway_ip', SAMPLE_PEER_CONTROLLER_GATEWAY_IP)
|
||||
return system_peer
|
||||
|
||||
def _post_get_test_system_peer(self, **kw):
|
||||
post_body = self._get_test_system_peer_dict('dict', **kw)
|
||||
return post_body
|
||||
|
||||
# The following methods are required for subclasses of APIMixin
|
||||
|
||||
def get_api_prefix(self):
|
||||
return self.API_PREFIX
|
||||
|
||||
def get_result_key(self):
|
||||
return self.RESULT_KEY
|
||||
|
||||
def get_expected_api_fields(self):
|
||||
return self.EXPECTED_FIELDS
|
||||
|
||||
def get_omitted_api_fields(self):
|
||||
return []
|
||||
|
||||
def _create_db_object(self, context, **kw):
|
||||
creation_fields = self._get_test_system_peer_dict('db', **kw)
|
||||
return db_api.system_peer_create(context, **creation_fields)
|
||||
|
||||
def get_post_object(self):
|
||||
return self._post_get_test_system_peer()
|
||||
|
||||
def get_update_object(self):
|
||||
update_object = {
|
||||
'peer_controller_gateway_address': '192.168.205.1'
|
||||
}
|
||||
return update_object
|
||||
|
||||
|
||||
# Combine System Peer API with mixins to test post, get, update and delete
|
||||
class TestSystemPeerPost(testroot.DCManagerApiTest,
|
||||
SystemPeerAPIMixin, PostJSONMixin):
|
||||
def setUp(self):
|
||||
super(TestSystemPeerPost, self).setUp()
|
||||
|
||||
def verify_post_failure(self, response):
|
||||
# Failures will return text rather than JSON
|
||||
self.assertEqual(response.content_type, 'text/plain')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_create_with_numerical_uuid_fails(self, mock_client):
|
||||
# A numerical uuid is not permitted. otherwise the 'get' operations
|
||||
# which support getting by either name or ID could become confused
|
||||
# if a name for one peer was the same as an ID for another.
|
||||
ndict = self.get_post_object()
|
||||
ndict['peer_uuid'] = '123'
|
||||
response = self.app.post_json(self.get_api_prefix(),
|
||||
ndict,
|
||||
headers=self.get_api_headers(),
|
||||
expect_errors=True)
|
||||
self.verify_post_failure(response)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_create_with_blank_uuid_fails(self, mock_client):
|
||||
# An empty name is not permitted
|
||||
ndict = self.get_post_object()
|
||||
ndict['peer_uuid'] = ''
|
||||
response = self.app.post_json(self.get_api_prefix(),
|
||||
ndict,
|
||||
headers=self.get_api_headers(),
|
||||
expect_errors=True)
|
||||
self.verify_post_failure(response)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_create_with_empty_manager_endpoint_fails(self, mock_client):
|
||||
# An empty description is considered invalid
|
||||
ndict = self.get_post_object()
|
||||
ndict['manager_endpoint'] = ''
|
||||
response = self.app.post_json(self.get_api_prefix(),
|
||||
ndict,
|
||||
headers=self.get_api_headers(),
|
||||
expect_errors=True)
|
||||
self.verify_post_failure(response)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_create_with_wrong_manager_endpoint_fails(self, mock_client):
|
||||
# An empty description is considered invalid
|
||||
ndict = self.get_post_object()
|
||||
ndict['manager_endpoint'] = 'ftp://somepath'
|
||||
response = self.app.post_json(self.get_api_prefix(),
|
||||
ndict,
|
||||
headers=self.get_api_headers(),
|
||||
expect_errors=True)
|
||||
self.verify_post_failure(response)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_create_with_wrong_peergw_ip_fails(self, mock_client):
|
||||
# An empty description is considered invalid
|
||||
ndict = self.get_post_object()
|
||||
ndict['peer_controller_gateway_address'] = '123'
|
||||
response = self.app.post_json(self.get_api_prefix(),
|
||||
ndict,
|
||||
headers=self.get_api_headers(),
|
||||
expect_errors=True)
|
||||
self.verify_post_failure(response)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_create_with_bad_administrative_state(self, mock_client):
|
||||
# update_apply_type must be either 'enabled' or 'disabled'
|
||||
ndict = self.get_post_object()
|
||||
ndict['administrative_state'] = 'something_invalid'
|
||||
response = self.app.post_json(self.get_api_prefix(),
|
||||
ndict,
|
||||
headers=self.get_api_headers(),
|
||||
expect_errors=True)
|
||||
self.verify_post_failure(response)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_create_with_bad_heartbeat_interval(self, mock_client):
|
||||
# heartbeat_interval must be an integer between 1 and 600
|
||||
ndict = self.get_post_object()
|
||||
# All the entries in bad_values should be considered invalid
|
||||
bad_values = [0, 601, -1, 'abc']
|
||||
for bad_value in bad_values:
|
||||
ndict['heartbeat_interval'] = bad_value
|
||||
response = self.app.post_json(self.get_api_prefix(),
|
||||
ndict,
|
||||
headers=self.get_api_headers(),
|
||||
expect_errors=True)
|
||||
self.verify_post_failure(response)
|
||||
|
||||
|
||||
class TestSystemPeerGet(testroot.DCManagerApiTest,
|
||||
SystemPeerAPIMixin, GetMixin):
|
||||
def setUp(self):
|
||||
super(TestSystemPeerGet, self).setUp()
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_get_single_by_uuid(self, mock_client):
|
||||
# create a system peer
|
||||
context = utils.dummy_context()
|
||||
peer_uuid = str(uuid.uuid4())
|
||||
self._create_db_object(context, peer_uuid=peer_uuid)
|
||||
|
||||
# Test that a GET operation for a valid ID works
|
||||
response = self.app.get(self.get_single_url(peer_uuid),
|
||||
headers=self.get_api_headers())
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.OK)
|
||||
self.validate_entry(response.json)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_get_single_by_name(self, mock_client):
|
||||
# create a system peer
|
||||
context = utils.dummy_context()
|
||||
peer_name = 'TestPeer'
|
||||
self._create_db_object(context, peer_name=peer_name)
|
||||
|
||||
# Test that a GET operation for a valid ID works
|
||||
response = self.app.get(self.get_single_url(peer_name),
|
||||
headers=self.get_api_headers())
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.OK)
|
||||
self.validate_entry(response.json)
|
||||
|
||||
|
||||
class TestSystemPeerUpdate(testroot.DCManagerApiTest,
|
||||
SystemPeerAPIMixin, UpdateMixin):
|
||||
def setUp(self):
|
||||
super(TestSystemPeerUpdate, self).setUp()
|
||||
|
||||
def validate_updated_fields(self, sub_dict, full_obj):
|
||||
for key, value in sub_dict.items():
|
||||
key = key.replace('_', '-')
|
||||
self.assertEqual(value, full_obj.get(key))
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_update_invalid_administrative_state(self, mock_client):
|
||||
context = utils.dummy_context()
|
||||
single_obj = self._create_db_object(context)
|
||||
update_data = {
|
||||
'administrative_state': 'something_bad'
|
||||
}
|
||||
response = self.app.patch_json(self.get_single_url(single_obj.id),
|
||||
headers=self.get_api_headers(),
|
||||
params=update_data,
|
||||
expect_errors=True)
|
||||
# Failures will return text rather than json
|
||||
self.assertEqual(response.content_type, 'text/plain')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_update_invalid_heartbeat_interval(self, mock_client):
|
||||
context = utils.dummy_context()
|
||||
single_obj = self._create_db_object(context)
|
||||
update_data = {
|
||||
'heartbeat_interval': -1
|
||||
}
|
||||
response = self.app.patch_json(self.get_single_url(single_obj.id),
|
||||
headers=self.get_api_headers(),
|
||||
params=update_data,
|
||||
expect_errors=True)
|
||||
# Failures will return text rather than json
|
||||
self.assertEqual(response.content_type, 'text/plain')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
|
||||
class TestSystemPeerDelete(testroot.DCManagerApiTest,
|
||||
SystemPeerAPIMixin, DeleteMixin):
|
||||
def setUp(self):
|
||||
super(TestSystemPeerDelete, self).setUp()
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_delete_by_uuid(self, mock_client):
|
||||
context = utils.dummy_context()
|
||||
peer_uuid = str(uuid.uuid4())
|
||||
self._create_db_object(context, peer_uuid=peer_uuid)
|
||||
response = self.app.delete_json(self.get_single_url(peer_uuid),
|
||||
headers=self.get_api_headers())
|
||||
self.assertEqual(response.status_int, 200)
|
||||
|
||||
@mock.patch.object(rpc_client, 'ManagerClient')
|
||||
def test_delete_by_name(self, mock_client):
|
||||
context = utils.dummy_context()
|
||||
peer_name = 'TestPeer'
|
||||
self._create_db_object(context, peer_name=peer_name)
|
||||
response = self.app.delete_json(self.get_single_url(peer_name),
|
||||
headers=self.get_api_headers())
|
||||
self.assertEqual(response.status_int, 200)
|
@ -0,0 +1,698 @@
|
||||
# Copyright (c) 2023-2024 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
import http.client
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from oslo_messaging import RemoteError
|
||||
|
||||
from dcmanager.api.controllers.v1 import system_peers
|
||||
from dcmanager.common import consts
|
||||
from dcmanager.db.sqlalchemy import api as db_api
|
||||
from dcmanager.tests.unit.api.test_root_controller import DCManagerApiTest
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import APIMixin
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import DeleteMixin
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import GetMixin
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import PostJSONMixin
|
||||
from dcmanager.tests.unit.api.v1.controllers.mixins import UpdateMixin
|
||||
from dcmanager.tests.unit.common import fake_subcloud
|
||||
|
||||
SAMPLE_SYSTEM_PEER_UUID = str(uuid.uuid4())
|
||||
SAMPLE_SYSTEM_PEER_NAME = 'SystemPeer1'
|
||||
SAMPLE_MANAGER_ENDPOINT = 'http://127.0.0.1:5000'
|
||||
SAMPLE_MANAGER_USERNAME = 'admin'
|
||||
SAMPLE_MANAGER_PASSWORD = 'password'
|
||||
SAMPLE_ADMINISTRATIVE_STATE = 'enabled'
|
||||
SAMPLE_HEARTBEAT_INTERVAL = 10
|
||||
SAMPLE_HEARTBEAT_FAILURE_THRESHOLD = 3
|
||||
SAMPLE_HEARTBEAT_FAILURES_POLICY = 'alarm'
|
||||
SAMPLE_HEARTBEAT_MAINTENANCE_TIMEOUT = 600
|
||||
SAMPLE_PEER_CONTROLLER_GATEWAY_IP = '128.128.128.1'
|
||||
|
||||
|
||||
class SystemPeersAPIMixin(APIMixin):
|
||||
API_PREFIX = '/v1.0/system-peers'
|
||||
RESULT_KEY = 'system_peers'
|
||||
EXPECTED_FIELDS = [
|
||||
'id', 'peer-uuid', 'peer-name', 'manager-endpoint', 'manager-username',
|
||||
'peer-controller-gateway-address', 'administrative-state',
|
||||
'heartbeat-interval', 'heartbeat-failure-threshold',
|
||||
'heartbeat-failure-policy', 'heartbeat-maintenance-timeout', 'created-at',
|
||||
'updated-at'
|
||||
]
|
||||
|
||||
def _get_test_system_peer_dict(self, data_type, **kw):
|
||||
# id should not be part of the structure
|
||||
system_peer = {
|
||||
'peer_uuid': kw.get('peer_uuid', SAMPLE_SYSTEM_PEER_UUID),
|
||||
'peer_name': kw.get('peer_name', SAMPLE_SYSTEM_PEER_NAME),
|
||||
'administrative_state': kw.get(
|
||||
'administrative_state', SAMPLE_ADMINISTRATIVE_STATE
|
||||
),
|
||||
'heartbeat_interval': kw.get(
|
||||
'heartbeat_interval', SAMPLE_HEARTBEAT_INTERVAL
|
||||
),
|
||||
'heartbeat_failure_threshold': kw.get(
|
||||
'heartbeat_failure_threshold', SAMPLE_HEARTBEAT_FAILURE_THRESHOLD
|
||||
),
|
||||
'heartbeat_failure_policy': kw.get(
|
||||
'heartbeat_failure_policy', SAMPLE_HEARTBEAT_FAILURES_POLICY
|
||||
),
|
||||
'heartbeat_maintenance_timeout': kw.get(
|
||||
'heartbeat_maintenance_timeout', SAMPLE_HEARTBEAT_MAINTENANCE_TIMEOUT
|
||||
)
|
||||
}
|
||||
|
||||
if data_type == 'db':
|
||||
system_peer['endpoint'] = \
|
||||
kw.get('manager_endpoint', SAMPLE_MANAGER_ENDPOINT)
|
||||
system_peer['username'] = \
|
||||
kw.get('manager_username', SAMPLE_MANAGER_USERNAME)
|
||||
system_peer['password'] = \
|
||||
kw.get('manager_password', SAMPLE_MANAGER_PASSWORD)
|
||||
system_peer['gateway_ip'] = kw.get(
|
||||
'peer_controller_gateway_ip', SAMPLE_PEER_CONTROLLER_GATEWAY_IP
|
||||
)
|
||||
else:
|
||||
system_peer['manager_endpoint'] = \
|
||||
kw.get('manager_endpoint', SAMPLE_MANAGER_ENDPOINT)
|
||||
system_peer['manager_username'] = \
|
||||
kw.get('manager_username', SAMPLE_MANAGER_USERNAME)
|
||||
system_peer['manager_password'] = \
|
||||
kw.get('manager_password', SAMPLE_MANAGER_PASSWORD)
|
||||
system_peer['peer_controller_gateway_address'] = kw.get(
|
||||
'peer_controller_gateway_ip', SAMPLE_PEER_CONTROLLER_GATEWAY_IP
|
||||
)
|
||||
|
||||
return system_peer
|
||||
|
||||
def _post_get_test_system_peer(self, **kw):
|
||||
return self._get_test_system_peer_dict('dict', **kw)
|
||||
|
||||
# The following methods are required for subclasses of APIMixin
|
||||
def get_api_prefix(self):
|
||||
return self.API_PREFIX
|
||||
|
||||
def get_result_key(self):
|
||||
return self.RESULT_KEY
|
||||
|
||||
def get_expected_api_fields(self):
|
||||
return self.EXPECTED_FIELDS
|
||||
|
||||
def get_omitted_api_fields(self):
|
||||
return []
|
||||
|
||||
def _create_db_object(self, context, **kw):
|
||||
creation_fields = self._get_test_system_peer_dict('db', **kw)
|
||||
return db_api.system_peer_create(context, **creation_fields)
|
||||
|
||||
def get_post_object(self):
|
||||
return self._post_get_test_system_peer()
|
||||
|
||||
def get_update_object(self):
|
||||
return {'peer_controller_gateway_address': '192.168.205.1'}
|
||||
|
||||
|
||||
class SystemPeersPropertiesValidationMixin(object):
|
||||
"""Specifies common test cases to validate payload properties in requests"""
|
||||
|
||||
def _remove_empty_string_in_patch_request(self, invalid_values):
|
||||
"""Removes the empty string in patch requests
|
||||
|
||||
When the request method is patch, the properties can be sent as empty string
|
||||
values, which does not happen in post requests. Because of that, it's
|
||||
necessary to remove it from the validated values.
|
||||
"""
|
||||
|
||||
if self.method == self.app.patch_json:
|
||||
invalid_values.remove("")
|
||||
|
||||
def test_request_fails_without_payload(self):
|
||||
"""Test request fails without payload"""
|
||||
|
||||
self.params = {}
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Body required"
|
||||
)
|
||||
|
||||
@mock.patch.object(json, 'loads')
|
||||
def test_request_fails_with_json_loads_exception(self, mock_json_loads):
|
||||
"""Test request fails with json loads exception"""
|
||||
|
||||
mock_json_loads.side_effect = Exception()
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Request body is malformed."
|
||||
)
|
||||
mock_json_loads.assert_called_once()
|
||||
|
||||
def test_request_fails_with_invalid_payload(self):
|
||||
"""Test request fails with invalid payload"""
|
||||
|
||||
self.params = "invalid"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Invalid request body format"
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_uuid(self):
|
||||
"""Test request fails with invalid uuid
|
||||
|
||||
A numerical uuid is not permitted. Otherwise, the 'get' operation which
|
||||
supports getting a system peer by either name or ID could become confusing
|
||||
if the name for a peer was the same as the ID for another.
|
||||
"""
|
||||
|
||||
invalid_values = ["", "999"]
|
||||
self._remove_empty_string_in_patch_request(invalid_values)
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["peer_uuid"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Invalid peer uuid", index
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_name(self):
|
||||
"""Test request fails with invalid name"""
|
||||
|
||||
invalid_values = ["", "999", "a" * 256, ".*+?|()[]{}^$"]
|
||||
self._remove_empty_string_in_patch_request(invalid_values)
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["peer_name"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Invalid peer name", index
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_manager_endpoint(self):
|
||||
"""Test request fails with invalid manager endpoint"""
|
||||
|
||||
invalid_values = [
|
||||
"", "ftp://somepath",
|
||||
"a" * system_peers.MAX_SYSTEM_PEER_MANAGER_ENDPOINT_LEN
|
||||
]
|
||||
self._remove_empty_string_in_patch_request(invalid_values)
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["manager_endpoint"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Invalid peer manager_endpoint",
|
||||
call_count=index
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_manager_username(self):
|
||||
"""Test request fails with invalid manager username"""
|
||||
|
||||
invalid_values = [
|
||||
"", "a" * system_peers.MAX_SYSTEM_PEER_MANAGER_USERNAME_LEN
|
||||
]
|
||||
self._remove_empty_string_in_patch_request(invalid_values)
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["manager_username"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Invalid peer manager_username",
|
||||
call_count=index
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_manager_password(self):
|
||||
"""Test request fails with invalid manager password"""
|
||||
|
||||
invalid_values = [
|
||||
"", "a" * system_peers.MAX_SYSTEM_PEER_MANAGER_PASSWORD_LEN
|
||||
]
|
||||
self._remove_empty_string_in_patch_request(invalid_values)
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["manager_password"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Invalid peer manager_password",
|
||||
call_count=index
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_peer_controller_gateway_address(self):
|
||||
"""Test request fails with invalid peer controller gateway address"""
|
||||
|
||||
invalid_values = [
|
||||
"", "a" * system_peers.MAX_SYSTEM_PEER_STRING_DEFAULT_LEN,
|
||||
"192.168.0.0.1"
|
||||
]
|
||||
self._remove_empty_string_in_patch_request(invalid_values)
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["peer_controller_gateway_address"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST,
|
||||
"Invalid peer peer_controller_gateway_address", call_count=index
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_administrative_state(self):
|
||||
"""Test request fails with invalid administrative state
|
||||
|
||||
The administrative state must be either enabled or disabled.
|
||||
"""
|
||||
|
||||
self.params["administrative_state"] = "fake"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Invalid peer administrative_state"
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_heartbeat_interval(self):
|
||||
"""Test request fails with invalid heartbeat interval
|
||||
|
||||
The heartbeat interval must be between 1 and 600.
|
||||
"""
|
||||
|
||||
invalid_values = [
|
||||
system_peers.MIN_SYSTEM_PEER_HEARTBEAT_INTERVAL - 1,
|
||||
system_peers.MAX_SYSTEM_PEER_HEARTBEAT_INTERVAL + 1,
|
||||
-1, "fake"
|
||||
]
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["heartbeat_interval"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "Invalid peer heartbeat_interval",
|
||||
call_count=index
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_heartbeat_failure_threshold(self):
|
||||
"""Test request fails with invalid heartbeat failure threshold"""
|
||||
|
||||
invalid_values = [
|
||||
system_peers.MIN_SYSTEM_PEER_HEARTBEAT_FAILURE_THRESHOLD - 1,
|
||||
system_peers.MAX_SYSTEM_PEER_HEARTBEAT_FAILURE_THRESHOLD + 1,
|
||||
-1, "fake"
|
||||
]
|
||||
|
||||
# When the request method is patch, the invalid_value 0 results in the if
|
||||
# condition returning false as if a value was not sent. Because of that,
|
||||
# it needs to be removed from the validation.
|
||||
if self.method == self.app.patch_json:
|
||||
invalid_values.remove(
|
||||
system_peers.MIN_SYSTEM_PEER_HEARTBEAT_FAILURE_THRESHOLD - 1
|
||||
)
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["heartbeat_failure_threshold"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST,
|
||||
"Invalid peer heartbeat_failure_threshold", call_count=index
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_heartbeat_failure_policy(self):
|
||||
"""Test request fails with invalid heartbeat failure policy
|
||||
|
||||
The heartbeat failure policy must be either alarm, rehome or delegate.
|
||||
"""
|
||||
|
||||
self.params["heartbeat_failure_policy"] = "fake"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST,
|
||||
"Invalid peer heartbeat_failure_policy"
|
||||
)
|
||||
|
||||
def test_request_fails_with_invalid_heartbeat_maintenance_timeout(self):
|
||||
"""Test request fails with invalid heartbeat maintenance timeout"""
|
||||
|
||||
invalid_values = [
|
||||
system_peers.MIN_SYSTEM_PEER_HEARTBEAT_MAINTENACE_TIMEOUT - 1,
|
||||
system_peers.MAX_SYSTEM_PEER_HEARTBEAT_MAINTENACE_TIMEOUT + 1,
|
||||
-1, "fake"
|
||||
]
|
||||
|
||||
for index, invalid_value in enumerate(invalid_values, start=1):
|
||||
self.params["heartbeat_maintenance_timeout"] = invalid_value
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST,
|
||||
"Invalid peer heartbeat_maintenance_timeout", call_count=index
|
||||
)
|
||||
|
||||
|
||||
class BaseTestSystemPeersController(DCManagerApiTest, SystemPeersAPIMixin):
|
||||
"""Base class for testing the SystemPeersController"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.url = "/v1.0/system-peers"
|
||||
|
||||
self._mock_rpc_client()
|
||||
|
||||
|
||||
class TestSystemPeersController(BaseTestSystemPeersController):
|
||||
"""Test class for SystemPeersController"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
def test_unmapped_method(self):
|
||||
"""Test requesting an unmapped method results in success with null content"""
|
||||
|
||||
self.method = self.app.put
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_response(response)
|
||||
self.assertEqual(response.text, "null")
|
||||
|
||||
|
||||
class TestSystemPeersGet(BaseTestSystemPeersController, GetMixin):
|
||||
"""Test class for get requests"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.method = self.app.get
|
||||
|
||||
# TODO(rlima): update the GetMixin to create the object in setUp rather
|
||||
# than in the test case and update this class. This should be done in
|
||||
# all of the classes
|
||||
|
||||
def _assert_response_content(self, response):
|
||||
"""Assert the response content from get requests
|
||||
|
||||
The database returned values use _ while the returned dict from the API
|
||||
response return values with -.
|
||||
"""
|
||||
|
||||
for key, value in self.system_peer.items():
|
||||
key = key.replace("_", "-")
|
||||
|
||||
if key == "created-at" or key == "updated-at" or "deleted-at":
|
||||
continue
|
||||
|
||||
self.assertEqual(response.json[key], value)
|
||||
|
||||
def test_get_succeeds_by_id(self):
|
||||
"""Test get succeeds by id"""
|
||||
|
||||
self.system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{self.system_peer.id}"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_response(response)
|
||||
self._assert_response_content(response)
|
||||
|
||||
def test_get_succeeds_by_name(self):
|
||||
"""Test get succeeds by name"""
|
||||
|
||||
self.system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{self.system_peer.peer_name}"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_response(response)
|
||||
self._assert_response_content(response)
|
||||
|
||||
def test_get_succeeds_with_subcloud_peer_groups(self):
|
||||
"""Test get succeeds with subcloud peer groups"""
|
||||
|
||||
self.system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{self.system_peer.peer_name}/True"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_response(response)
|
||||
self.assertTrue("subcloud_peer_groups" in response.json)
|
||||
self.assertEqual(response.json["subcloud_peer_groups"], [])
|
||||
|
||||
|
||||
class TestSystemPeersPost(
|
||||
BaseTestSystemPeersController, SystemPeersPropertiesValidationMixin,
|
||||
PostJSONMixin
|
||||
):
|
||||
"""Test class for post requests"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.method = self.app.post_json
|
||||
self.params = self.get_post_object()
|
||||
|
||||
def test_post_fails_with_db_api_duplicate_entry_exception(self):
|
||||
"""Test post fails with db api duplicate entry exception"""
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_response(response)
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.CONFLICT,
|
||||
"A system peer with this UUID already exists"
|
||||
)
|
||||
|
||||
@mock.patch.object(db_api, "system_peer_create")
|
||||
def test_post_fails_with_db_api_remote_error(self, mock_db_api):
|
||||
"""Test post fails with db api remote error"""
|
||||
|
||||
mock_db_api.side_effect = RemoteError("msg", "value")
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.UNPROCESSABLE_ENTITY, "value"
|
||||
)
|
||||
|
||||
@mock.patch.object(db_api, "system_peer_create")
|
||||
def test_post_fails_with_db_api_generic_exception(self, mock_db_api):
|
||||
"""Test post fails with db api generic exception"""
|
||||
|
||||
mock_db_api.side_effect = Exception()
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.INTERNAL_SERVER_ERROR,
|
||||
"Unable to create system peer"
|
||||
)
|
||||
|
||||
|
||||
class BaseTestSystemPeersPatch(BaseTestSystemPeersController):
|
||||
"""Base test class for patch requests"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.method = self.app.patch_json
|
||||
self.params = self.get_post_object()
|
||||
|
||||
|
||||
class TestSystemPeersPatchPropertiesValidation(
|
||||
BaseTestSystemPeersPatch, SystemPeersPropertiesValidationMixin
|
||||
):
|
||||
"""Test class for validating the payload properties in patch requests"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.system_peer = self._create_db_object(self.ctx)
|
||||
self.url = f"{self.url}/{self.system_peer.peer_uuid}"
|
||||
|
||||
|
||||
class TestSystemPeersPatch(BaseTestSystemPeersPatch, UpdateMixin):
|
||||
"""Test class for patch requests"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
# Overrides validate_updated_fields from UpdateMixin
|
||||
def validate_updated_fields(self, sub_dict, full_obj):
|
||||
for key, value in sub_dict.items():
|
||||
key = key.replace('_', '-')
|
||||
self.assertEqual(value, full_obj.get(key))
|
||||
|
||||
def test_patch_fails_with_inexistent_system_peer(self):
|
||||
"""Test patch fails with inexistent system peer"""
|
||||
|
||||
self.url = f"{self.url}/9999"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.NOT_FOUND, "System Peer not found"
|
||||
)
|
||||
|
||||
def test_patch_fails_without_properties_to_update(self):
|
||||
"""Test patch fails without properties to update"""
|
||||
|
||||
system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{system_peer.peer_uuid}"
|
||||
self.params = {"key": "value"}
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST, "nothing to update"
|
||||
)
|
||||
|
||||
@mock.patch.object(db_api, "system_peer_update")
|
||||
def test_patch_fails_with_db_api_remote_error(self, mock_db_api):
|
||||
"""Test patch fails with db api remote error"""
|
||||
|
||||
system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{system_peer.peer_uuid}"
|
||||
|
||||
mock_db_api.side_effect = RemoteError("msg", "value")
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.UNPROCESSABLE_ENTITY, "value"
|
||||
)
|
||||
|
||||
@mock.patch.object(db_api, "system_peer_update")
|
||||
def test_patch_fails_with_db_api_generic_exception(self, mock_db_api):
|
||||
"""Test patch fails with db api generic exception"""
|
||||
|
||||
system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{system_peer.peer_uuid}"
|
||||
|
||||
mock_db_api.side_effect = Exception()
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.INTERNAL_SERVER_ERROR,
|
||||
"Unable to update system peer"
|
||||
)
|
||||
|
||||
|
||||
class TestSystemPeersDelete(BaseTestSystemPeersController, DeleteMixin):
|
||||
"""Test class for delete requests"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.method = self.app.delete
|
||||
|
||||
def test_delete_succeeds_by_id(self):
|
||||
"""Test delete succeeds by id"""
|
||||
|
||||
system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{system_peer.peer_uuid}"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_response(response)
|
||||
self.assertEqual(len(db_api.system_peer_get_all(self.ctx)), 0)
|
||||
|
||||
def test_delete_succeeds_by_name(self):
|
||||
"""Test delete succeeds by name"""
|
||||
|
||||
system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{system_peer.peer_name}"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_response(response)
|
||||
self.assertEqual(len(db_api.system_peer_get_all(self.ctx)), 0)
|
||||
|
||||
def test_delete_fails_with_existing_association(self):
|
||||
"""Test delete fails with existing association"""
|
||||
|
||||
system_peer = self._create_db_object(self.ctx)
|
||||
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
|
||||
|
||||
db_api.peer_group_association_create(
|
||||
self.ctx, subcloud.peer_group_id, system_peer.id,
|
||||
consts.PEER_GROUP_PRIMARY_PRIORITY, consts.ASSOCIATION_TYPE_PRIMARY,
|
||||
consts.ASSOCIATION_SYNC_STATUS_IN_SYNC, "None"
|
||||
)
|
||||
|
||||
self.url = f"{self.url}/{system_peer.peer_uuid}"
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.BAD_REQUEST,
|
||||
"Cannot delete a system peer which is associated with peer group."
|
||||
)
|
||||
|
||||
@mock.patch.object(db_api, "system_peer_destroy")
|
||||
def test_delete_fails_with_db_api_remote_error(self, mock_db_api):
|
||||
"""Test delete fails with db api remote error"""
|
||||
|
||||
system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{system_peer.peer_uuid}"
|
||||
|
||||
mock_db_api.side_effect = RemoteError("msg", "value")
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.UNPROCESSABLE_ENTITY, "value"
|
||||
)
|
||||
|
||||
@mock.patch.object(db_api, "system_peer_destroy")
|
||||
def test_delete_fails_with_db_api_generic_exception(self, mock_db_api):
|
||||
"""Test delete fails with db api generic exception"""
|
||||
|
||||
system_peer = self._create_db_object(self.ctx)
|
||||
|
||||
self.url = f"{self.url}/{system_peer.peer_uuid}"
|
||||
|
||||
mock_db_api.side_effect = Exception()
|
||||
|
||||
response = self._send_request()
|
||||
|
||||
self._assert_pecan_and_response(
|
||||
response, http.client.INTERNAL_SERVER_ERROR,
|
||||
"Unable to delete system peer"
|
||||
)
|
Loading…
Reference in New Issue
Block a user