diff --git a/tacker/tests/unit/common/container/test_kubernetes_utils.py b/tacker/tests/unit/common/container/test_kubernetes_utils.py new file mode 100644 index 000000000..60c85b51b --- /dev/null +++ b/tacker/tests/unit/common/container/test_kubernetes_utils.py @@ -0,0 +1,46 @@ +# Copyright (c) 2021 FUJITSU. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import cryptography.fernet as fernet +from kubernetes import client + +from tacker.common.container import kubernetes_utils +from tacker.tests import base + + +class TestKubernetesHTTPAPI(base.BaseTestCase): + + def setUp(self): + super(TestKubernetesHTTPAPI, self).setUp() + self.kubernetes_http_api = kubernetes_utils.KubernetesHTTPAPI() + + def test_get_extension_api_client(self): + auth = {"auth_url": "auth", 'bearer_token': 'token'} + extensions_v1_beta1_api = \ + self.kubernetes_http_api.get_extension_api_client(auth) + self.assertIsInstance( + extensions_v1_beta1_api, + client.api.extensions_v1beta1_api.ExtensionsV1beta1Api) + + def test_get_core_api_client(self): + auth = {"auth_url": "auth", 'bearer_token': 'token'} + extensions_v1_beta1_api = self.kubernetes_http_api.get_core_api_client( + auth) + self.assertIsInstance(extensions_v1_beta1_api, + client.api.core_api.CoreApi) + + def create_fernet_key(self): + fernet_key, fernet_obj = self.kubernetes_http_api.create_fernet_key() + self.assertEqual(len(fernet_key), 44) + self.assertIsInstance(fernet_obj, fernet.Fernet) diff --git a/tacker/tests/unit/conductor/conductorrpc/test_vim_monitor_rpc.py b/tacker/tests/unit/conductor/conductorrpc/test_vim_monitor_rpc.py new file mode 100644 index 000000000..0bdd6a8a4 --- /dev/null +++ b/tacker/tests/unit/conductor/conductorrpc/test_vim_monitor_rpc.py @@ -0,0 +1,24 @@ +# Copyright (C) 2021 FUJITSU +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tacker.conductor.conductorrpc import vim_monitor_rpc +from tacker.tests.unit import base as unit_base + + +class TestVIMUpdateRPC(unit_base.TestCase): + + def setUp(self): + super(TestVIMUpdateRPC, self).setUp() + self.vimupdaterpc_object = vim_monitor_rpc.VIMUpdateRPC() diff --git a/tacker/tests/unit/conductor/test_conductor_server.py b/tacker/tests/unit/conductor/test_conductor_server.py index 338c5be6a..fe139a4d9 100644 --- a/tacker/tests/unit/conductor/test_conductor_server.py +++ b/tacker/tests/unit/conductor/test_conductor_server.py @@ -2865,3 +2865,49 @@ class TestConductor(SqlTestCase, unit_base.FixturedTestCase): self.body_data, self.vnfd_pkg_data, vnfd_id) + + def test_update_vim(self): + vim_id = uuidsentinel.vim_id + status = "REACHABLE" + result = self.conductor.update_vim(self.context, vim_id, status) + self.assertEqual(result, "REACHABLE") + + @mock.patch.object(csar_utils, 'load_csar_data') + @mock.patch.object(glance_store, 'load_csar') + @mock.patch.object(glance_store, 'delete_csar') + def test_revert_upload_vnf_package_in_upload_vnf_package_content(self, + mock_load_csar, + mock_load_csar_data, + mock_delete_csar): + mock_load_csar_data.return_value = (mock.ANY, mock.ANY, mock.ANY) + mock_load_csar.return_value = '/var/lib/tacker/5f5d99c6-844a-4c3' \ + '1-9e6d-ab21b87dcfff.zip' + mock_delete_csar.return_value = "" + self.assertRaisesRegex(Exception, + "not enough values to unpack", + self.conductor.upload_vnf_package_content, + self.context, self.vnf_package) + + @mock.patch.object(conductor_server, 'revert_upload_vnf_package') + @mock.patch.object(csar_utils, 'load_csar_data') + @mock.patch.object(glance_store, 'load_csar') + def test_onboard_vnf_package_through_upload_vnf_package_content(self, + mock_load_csar, + mock_load_csar_data, + mock_revert): + vnf_data = fakes.get_vnf_package_vnfd() + vnf_data = self._rename_vnfdata_keys(vnf_data) + mock_load_csar_data.return_value = (vnf_data, [], []) + mock_load_csar.return_value = '/var/lib/tacker/5f5d99c6-844a-4c3' \ + '1-9e6d-ab21b87dcfff.zip' + fake_context = context.Context(tenant_id=uuidsentinel.tenant_id) + self.conductor.upload_vnf_package_content( + fake_context, self.vnf_package) + + def _rename_vnfdata_keys(self, vnf_data): + vnf_data["descriptor_id"] = vnf_data.pop("id") + vnf_data["provider"] = vnf_data.pop("vnf_provider") + vnf_data["product_name"] = vnf_data.pop("vnf_product_name") + vnf_data["software_version"] = vnf_data.pop("vnf_software_version") + vnf_data["descriptor_version"] = vnf_data.pop("vnfd_version") + return vnf_data