This patch creates the network v2.0 floating-ips-port-forwarding client.

https://docs.openstack.org/api-ref/network/v2/index.html#create-port-forwarding
Signed-off by: Soniya Vyas<svyas@redhat.com>

Change-Id: Icac4495c8a3fae877746652d0c9e00225b0a879d
This commit is contained in:
Soniya Vyas 2021-09-14 16:57:42 +05:30
parent 79da6961ed
commit c08dfd2014
6 changed files with 249 additions and 3 deletions

View File

@ -0,0 +1,6 @@
---
features:
- |
Add a new client to lists, creates, shows information for,
updates and deletes neutron floating ips port forwarding
resource.

View File

@ -76,6 +76,8 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
cls.subnetpools_client = cls.os_primary.subnetpools_client
cls.subnets_client = cls.os_primary.subnets_client
cls.ports_client = cls.os_primary.ports_client
cls.floating_ips_port_forwarding_client =\
cls.os_primary.floating_ips_port_forwarding_client
cls.quotas_client = cls.os_primary.network_quotas_client
cls.floating_ips_client = cls.os_primary.floating_ips_client
cls.security_groups_client = cls.os_primary.security_groups_client

View File

@ -59,6 +59,8 @@ class Manager(clients.ServiceClients):
self.ports_client = self.network.PortsClient()
self.network_quotas_client = self.network.QuotasClient()
self.floating_ips_client = self.network.FloatingIPsClient()
self.floating_ips_port_forwarding_client =\
self.network.FloatingIpsPortForwardingClient()
self.metering_labels_client = self.network.MeteringLabelsClient()
self.metering_label_rules_client = (
self.network.MeteringLabelRulesClient())

View File

@ -15,6 +15,8 @@
from tempest.lib.services.network.agents_client import AgentsClient
from tempest.lib.services.network.extensions_client import ExtensionsClient
from tempest.lib.services.network.floating_ips_client import FloatingIPsClient
from tempest.lib.services.network.floating_ips_port_forwarding_client import \
FloatingIpsPortForwardingClient
from tempest.lib.services.network.log_resource_client import LogResourceClient
from tempest.lib.services.network.loggable_resource_client import \
LoggableResourceClient
@ -45,9 +47,9 @@ from tempest.lib.services.network.trunks_client import TrunksClient
from tempest.lib.services.network.versions_client import NetworkVersionsClient
__all__ = ['AgentsClient', 'ExtensionsClient', 'FloatingIPsClient',
'MeteringLabelRulesClient', 'MeteringLabelsClient',
'NetworksClient', 'NetworkVersionsClient', 'PortsClient',
'QosClient', 'QosMinimumBandwidthRulesClient',
'FloatingIpsPortForwardingClient', 'MeteringLabelRulesClient',
'MeteringLabelsClient', 'NetworksClient', 'NetworkVersionsClient',
'PortsClient', 'QosClient', 'QosMinimumBandwidthRulesClient',
'QosLimitBandwidthRulesClient', 'QuotasClient', 'RoutersClient',
'SecurityGroupRulesClient', 'SecurityGroupsClient',
'SegmentsClient', 'ServiceProvidersClient', 'SubnetpoolsClient',

View File

@ -0,0 +1,78 @@
# Copyright 2021 Red Hat, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.services.network import base
class FloatingIpsPortForwardingClient(base.BaseNetworkClient):
def create_port_forwarding(self, floatingip_id, **kwargs):
"""Creates a floating IP port forwarding.
Creates port forwarding by using the configuration that you define in
the request object.
For a full list of available parameters, please refer to the official
API reference:
https://docs.openstack.org/api-ref/network/v2/index.html#create-port-forwarding
"""
uri = '/floatingips/%s/port_forwardings' % floatingip_id
post_data = {'port_forwarding': kwargs}
return self.create_resource(uri, post_data)
def update_port_forwarding(
self, floatingip_id, port_forwarding_id, **kwargs):
"""Updates a floating IP port_forwarding resource.
For a full list of available parameters, please refer to the official
API reference:
https://docs.openstack.org/api-ref/network/v2/index.html#update-a-port-forwarding
"""
uri = '/floatingips/%s/port_forwardings/%s' % (
floatingip_id, port_forwarding_id)
post_data = {'port_forwarding': kwargs}
return self.update_resource(uri, post_data)
def show_port_forwarding(
self, floatingip_id, port_forwarding_id, **fields):
"""Shows details for a floating IP port forwarding id.
For a full list of available parameters, please refer to the official
API reference:
https://docs.openstack.org/api-ref/network/v2/index.html#show-port-forwarding
"""
uri = '/floatingips/%s/port_forwardings/%s' % (
floatingip_id, port_forwarding_id)
return self.show_resource(uri, **fields)
def delete_port_forwarding(self, floatingip_id, port_forwarding_id):
"""Deletes a floating IP port_forwarding resource.
For a full list of available parameters, please refer to the official
API reference:
https://docs.openstack.org/api-ref/network/v2/index.html#delete-a-floating-ip-port-forwarding
"""
uri = '/floatingips/%s/port_forwardings/%s' % (
floatingip_id, port_forwarding_id)
return self.delete_resource(uri)
def list_port_forwardings(self, floatingip_id, **filters):
"""Lists floating Ip port forwardings.
For a full list of available parameters, please refer to the official
API reference:
https://docs.openstack.org/api-ref/network/v2/index.html#list-floating-ip-port-forwardings-detail
"""
uri = '/floatingips/%s/port_forwardings' % floatingip_id
return self.list_resources(uri, **filters)

View File

@ -0,0 +1,156 @@
# Copyright 2021 Red Hat, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from tempest.lib.services.network import floating_ips_port_forwarding_client
from tempest.tests.lib import fake_auth_provider
from tempest.tests.lib.services import base
class TestFloatingIpsPortForwardingClient(base.BaseServiceTest):
FAKE_PORT_FORWARDING_REQUEST = {
"port_forwarding": {
"protocol": "tcp",
"internal_ip_address": "10.0.0.11",
"internal_port": 25,
"internal_port_id": "1238be08-a2a8-4b8d-addf-fb5e2250e480",
"external_port": 2230,
"description": "Some description",
}
}
FAKE_PORT_FORWARDING_RESPONSE = {
"port_forwarding": {
"protocol": "tcp",
"internal_ip_address": "10.0.0.12",
"internal_port": 26,
"internal_port_id": "1238be08-a2a8-4b8d-addf-fb5e2250e480",
"external_port": 2130,
"description": "Some description",
"id": "825ade3c-9760-4880-8080-8fc2dbab9acc"
}
}
FAKE_PORT_FORWARDINGS = {
"port_forwardings": [
FAKE_PORT_FORWARDING_RESPONSE['port_forwarding']
]
}
FAKE_FLOATINGIP_ID = "a6800594-5b7a-4105-8bfe-723b346ce866"
FAKE_PORT_FORWARDING_ID = "a7800594-5b7a-4105-8bfe-723b346ce866"
def setUp(self):
super(TestFloatingIpsPortForwardingClient, self).setUp()
fake_auth = fake_auth_provider.FakeAuthProvider()
self.floating_ips_port_forwarding_client = \
floating_ips_port_forwarding_client.\
FloatingIpsPortForwardingClient(fake_auth,
"network",
"regionOne")
def _test_create_port_forwarding(self, bytes_body=False):
self.check_service_client_function(
self.floating_ips_port_forwarding_client.
create_port_forwarding,
"tempest.lib.common.rest_client.RestClient.post",
self.FAKE_PORT_FORWARDING_RESPONSE,
bytes_body,
201,
floatingip_id=self.FAKE_FLOATINGIP_ID,
**self.FAKE_PORT_FORWARDING_REQUEST)
def _test_list_port_forwardings(self, bytes_body=False):
self.check_service_client_function(
self.floating_ips_port_forwarding_client.
list_port_forwardings,
"tempest.lib.common.rest_client.RestClient.get",
self.FAKE_PORT_FORWARDINGS,
bytes_body,
200,
floatingip_id=self.FAKE_FLOATINGIP_ID)
def _test_show_port_forwardings(self, bytes_body=False):
self.check_service_client_function(
self.floating_ips_port_forwarding_client.
show_port_forwarding,
"tempest.lib.common.rest_client.RestClient.get",
self.FAKE_PORT_FORWARDING_RESPONSE,
bytes_body,
200,
floatingip_id=self.FAKE_FLOATINGIP_ID,
port_forwarding_id=self.FAKE_PORT_FORWARDING_ID)
def _test_delete_port_forwarding(self):
self.check_service_client_function(
self.floating_ips_port_forwarding_client.
delete_port_forwarding,
"tempest.lib.common.rest_client.RestClient.delete",
{},
status=204,
floatingip_id=self.FAKE_FLOATINGIP_ID,
port_forwarding_id=self.FAKE_PORT_FORWARDING_ID)
def _test_update_port_forwarding(self, bytes_body=False):
update_kwargs = {
"internal_port": "27"
}
resp_body = {
"port_forwarding": copy.deepcopy(
self.FAKE_PORT_FORWARDING_RESPONSE['port_forwarding']
)
}
resp_body["port_forwarding"].update(update_kwargs)
self.check_service_client_function(
self.floating_ips_port_forwarding_client.update_port_forwarding,
"tempest.lib.common.rest_client.RestClient.put",
resp_body,
bytes_body,
200,
floatingip_id=self.FAKE_FLOATINGIP_ID,
port_forwarding_id=self.FAKE_PORT_FORWARDING_ID,
**update_kwargs)
def test_list_port_forwardings_with_str_body(self):
self._test_list_port_forwardings()
def test_list_port_forwardings_with_bytes_body(self):
self._test_list_port_forwardings(bytes_body=True)
def test_show_port_forwardings_with_str_body(self):
self._test_show_port_forwardings()
def test_show_port_forwardings_with_bytes_body(self):
self._test_show_port_forwardings(bytes_body=True)
def test_create_port_forwarding_with_str_body(self):
self._test_create_port_forwarding()
def test_create_port_forwarding_with_bytes_body(self):
self._test_create_port_forwarding(bytes_body=True)
def test_update_port_forwarding_with_str_body(self):
self._test_update_port_forwarding()
def test_update_port_forwarding_with_bytes_body(self):
self._test_update_port_forwarding(bytes_body=True)