Blackify openstack.network

We disable the H301 check since it clashes with black.

Black used with the '-l 79 -S' flags.

A future change will ignore this commit in git-blame history by adding a
'git-blame-ignore-revs' file.

Change-Id: Ib987ac513c4f5e527bf4122b784d0a69856b903e
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2023-05-03 12:07:08 +01:00
parent bcf99f3433
commit f526b990f3
125 changed files with 3129 additions and 1868 deletions

View File

@@ -18,12 +18,23 @@ class NetworkResource(resource.Resource):
_allow_unknown_attrs_in_body = True
def _prepare_request(self, requires_id=None, prepend_key=False,
patch=False, base_path=None, params=None,
if_revision=None, **kwargs):
def _prepare_request(
self,
requires_id=None,
prepend_key=False,
patch=False,
base_path=None,
params=None,
if_revision=None,
**kwargs
):
req = super(NetworkResource, self)._prepare_request(
requires_id=requires_id, prepend_key=prepend_key, patch=patch,
base_path=base_path, params=params)
requires_id=requires_id,
prepend_key=prepend_key,
patch=patch,
base_path=base_path,
params=params,
)
if if_revision is not None:
req.headers['If-Match'] = "revision_number=%d" % if_revision
return req

File diff suppressed because it is too large Load Diff

View File

@@ -17,6 +17,7 @@ from openstack import utils
class AddressGroup(resource.Resource):
"""Address group extension."""
resource_key = 'address_group'
resources_key = 'address_groups'
base_path = '/address-groups'
@@ -31,9 +32,11 @@ class AddressGroup(resource.Resource):
_allow_unknown_attrs_in_body = True
_query_mapping = resource.QueryParameters(
"sort_key", "sort_dir",
'name', 'description',
'project_id'
"sort_key",
"sort_dir",
'name',
'description',
'project_id',
)
# Properties

View File

@@ -15,6 +15,7 @@ from openstack import resource
class AddressScope(resource.Resource):
"""Address scope extension."""
resource_key = 'address_scope'
resources_key = 'address_scopes'
base_path = '/address-scopes'
@@ -29,7 +30,8 @@ class AddressScope(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'name', 'ip_version',
'name',
'ip_version',
'project_id',
is_shared='shared',
)

View File

@@ -17,6 +17,7 @@ from openstack import utils
class Agent(resource.Resource):
"""Neutron agent extension."""
resource_key = 'agent'
resources_key = 'agents'
base_path = '/agents'
@@ -32,9 +33,14 @@ class Agent(resource.Resource):
# NOTE: We skip query for JSON fields and datetime fields
_query_mapping = resource.QueryParameters(
'agent_type', 'availability_zone', 'binary', 'description', 'host',
'agent_type',
'availability_zone',
'binary',
'description',
'host',
'topic',
is_admin_state_up='admin_state_up', is_alive='alive',
is_admin_state_up='admin_state_up',
is_alive='alive',
)
# Properties
@@ -85,8 +91,9 @@ class Agent(resource.Resource):
def remove_agent_from_network(self, session, network_id):
body = {'network_id': network_id}
url = utils.urljoin(self.base_path, self.id, 'dhcp-networks',
network_id)
url = utils.urljoin(
self.base_path, self.id, 'dhcp-networks', network_id
)
session.delete(url, json=body)
def add_router_to_agent(self, session, router):

View File

@@ -30,7 +30,9 @@ class AvailabilityZone(_resource.Resource):
# NOTE: We don't support query by state yet because there is a mapping
# at neutron side difficult to map.
_query_mapping = _resource.QueryParameters(
name='availability_zone', resource='agent_type')
name='availability_zone',
resource='agent_type',
)
# Properties
#: Name of the availability zone.

View File

@@ -43,7 +43,8 @@ class BgpSpeaker(resource.Resource):
#: Whether to enable or disable the advertisement of floating ip host
#: routes by the BGP Speaker. True by default.
advertise_floating_ip_host_routes = resource.Body(
'advertise_floating_ip_host_routes')
'advertise_floating_ip_host_routes'
)
#: Whether to enable or disable the advertisement of tenant network
#: routes by the BGP Speaker. True by default.
advertise_tenant_networks = resource.Body('advertise_tenant_networks')
@@ -164,6 +165,5 @@ class BgpSpeaker(resource.Resource):
:param bgp_agent_id: The id of the dynamic routing agent from which
remove the speaker.
"""
url = utils.urljoin('agents', bgp_agent_id,
'bgp-drinstances', self.id)
url = utils.urljoin('agents', bgp_agent_id, 'bgp-drinstances', self.id)
session.delete(url)

View File

@@ -31,9 +31,15 @@ class FirewallGroup(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'egress_firewall_policy_id',
'ingress_firewall_policy_id', 'name', 'shared', 'status', 'ports',
'project_id')
'description',
'egress_firewall_policy_id',
'ingress_firewall_policy_id',
'name',
'shared',
'status',
'ports',
'project_id',
)
# Properties
#: The administrative state of the firewall group, which is up (true) or

View File

@@ -33,7 +33,12 @@ class FirewallPolicy(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'firewall_rules', 'name', 'project_id', 'shared')
'description',
'firewall_rules',
'name',
'project_id',
'shared',
)
# Properties
#: Each time that the firewall policy or its associated rules are changed,

View File

@@ -31,9 +31,20 @@ class FirewallRule(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'action', 'description', 'destination_ip_address', 'name',
'destination_port', 'enabled', 'ip_version', 'project_id', 'protocol',
'shared', 'source_ip_address', 'source_port', 'firewall_policy_id')
'action',
'description',
'destination_ip_address',
'name',
'destination_port',
'enabled',
'ip_version',
'project_id',
'protocol',
'shared',
'source_ip_address',
'source_port',
'firewall_policy_id',
)
# Properties
#: The action that the API performs on traffic that matches the firewall

View File

@@ -29,7 +29,11 @@ class Flavor(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'name', 'service_type', is_enabled='enabled')
'description',
'name',
'service_type',
is_enabled='enabled',
)
# properties
#: description for the flavor
@@ -44,7 +48,8 @@ class Flavor(resource.Resource):
service_profile_ids = resource.Body('service_profiles', type=list)
def associate_flavor_with_service_profile(
self, session, service_profile_id=None):
self, session, service_profile_id=None
):
flavor_id = self.id
url = utils.urljoin(self.base_path, flavor_id, 'service_profiles')
body = {"service_profile": {"id": service_profile_id}}
@@ -52,9 +57,13 @@ class Flavor(resource.Resource):
return resp.json()
def disassociate_flavor_from_service_profile(
self, session, service_profile_id=None):
self, session, service_profile_id=None
):
flavor_id = self.id
url = utils.urljoin(
self.base_path, flavor_id, 'service_profiles', service_profile_id)
session.delete(url,)
self.base_path, flavor_id, 'service_profiles', service_profile_id
)
session.delete(
url,
)
return None

View File

@@ -30,12 +30,19 @@ class FloatingIP(_base.NetworkResource, tag.TagMixin):
# For backward compatibility include tenant_id as query param
_query_mapping = resource.QueryParameters(
'description', 'fixed_ip_address',
'floating_ip_address', 'floating_network_id',
'port_id', 'router_id', 'status', 'subnet_id',
'project_id', 'tenant_id',
'description',
'fixed_ip_address',
'floating_ip_address',
'floating_network_id',
'port_id',
'router_id',
'status',
'subnet_id',
'project_id',
'tenant_id',
tenant_id='project_id',
**tag.TagMixin._tag_query_parameters)
**tag.TagMixin._tag_query_parameters
)
# Properties
#: Timestamp at which the floating IP was created.

View File

@@ -28,8 +28,14 @@ class HealthMonitor(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'delay', 'expected_codes', 'http_method', 'max_retries',
'timeout', 'type', 'url_path', 'project_id',
'delay',
'expected_codes',
'http_method',
'max_retries',
'timeout',
'type',
'url_path',
'project_id',
is_admin_state_up='adminstate_up',
)

View File

@@ -28,9 +28,15 @@ class Listener(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'connection_limit', 'default_pool_id', 'default_tls_container_ref',
'description', 'name', 'project_id', 'protocol', 'protocol_port',
is_admin_state_up='admin_state_up'
'connection_limit',
'default_pool_id',
'default_tls_container_ref',
'description',
'name',
'project_id',
'protocol',
'protocol_port',
is_admin_state_up='admin_state_up',
)
# Properties

View File

@@ -28,9 +28,15 @@ class LoadBalancer(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'name', 'project_id', 'provider', 'provisioning_status',
'tenant_id', 'vip_address', 'vip_subnet_id',
is_admin_state_up='admin_state_up'
'description',
'name',
'project_id',
'provider',
'provisioning_status',
'tenant_id',
'vip_address',
'vip_subnet_id',
is_admin_state_up='admin_state_up',
)
# Properties

View File

@@ -18,6 +18,7 @@ from openstack import resource
class LocalIP(resource.Resource):
"""Local IP extension."""
resource_name = "local ip"
resource_key = "local_ip"
resources_key = "local_ips"
@@ -33,10 +34,14 @@ class LocalIP(resource.Resource):
_allow_unknown_attrs_in_body = True
_query_mapping = resource.QueryParameters(
"sort_key", "sort_dir",
'name', 'description',
'project_id', 'network_id',
'local_port_id', 'local_ip_address',
"sort_key",
"sort_dir",
'name',
'description',
'project_id',
'network_id',
'local_port_id',
'local_ip_address',
'ip_mode',
)

View File

@@ -18,6 +18,7 @@ from openstack import resource
class LocalIPAssociation(resource.Resource):
"""Local IP extension."""
resource_key = "port_association"
resources_key = "port_associations"
base_path = "/local_ips/%(local_ip_id)s/port_associations"
@@ -32,8 +33,11 @@ class LocalIPAssociation(resource.Resource):
_allow_unknown_attrs_in_body = True
_query_mapping = resource.QueryParameters(
'fixed_port_id', 'fixed_ip', 'host',
'fixed_port_id',
'fixed_ip',
'host',
)
# Properties
#: The fixed port ID.
fixed_port_id = resource.Body('fixed_port_id')

View File

@@ -28,7 +28,9 @@ class MeteringLabel(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'name', 'project_id',
'description',
'name',
'project_id',
is_shared='shared',
)

View File

@@ -28,8 +28,12 @@ class MeteringLabelRule(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'direction', 'metering_label_id', 'remote_ip_prefix',
'source_ip_prefix', 'destination_ip_prefix', 'project_id',
'direction',
'metering_label_id',
'remote_ip_prefix',
'source_ip_prefix',
'destination_ip_prefix',
'project_id',
)
# Properties
@@ -49,13 +53,15 @@ class MeteringLabelRule(resource.Resource):
tenant_id = resource.Body('tenant_id', deprecated=True)
#: The remote IP prefix to be associated with this metering label rule.
remote_ip_prefix = resource.Body(
'remote_ip_prefix', deprecated=True,
'remote_ip_prefix',
deprecated=True,
deprecation_reason="The use of 'remote_ip_prefix' in metering label "
"rules is deprecated and will be removed in future "
"releases. One should use instead, the "
"'source_ip_prefix' and/or 'destination_ip_prefix' "
"parameters. For more details, you can check the "
"spec: https://review.opendev.org/#/c/744702/.")
"rules is deprecated and will be removed in future "
"releases. One should use instead, the "
"'source_ip_prefix' and/or 'destination_ip_prefix' "
"parameters. For more details, you can check the "
"spec: https://review.opendev.org/#/c/744702/.",
)
#: The source IP prefix to be associated with this metering label rule.
source_ip_prefix = resource.Body('source_ip_prefix')

View File

@@ -29,9 +29,15 @@ class NDPProxy(resource.Resource):
_allow_unknown_attrs_in_body = True
_query_mapping = resource.QueryParameters(
"sort_key", "sort_dir",
'name', 'description', 'project_id',
'router_id', 'port_id', 'ip_address')
"sort_key",
"sort_dir",
'name',
'description',
'project_id',
'router_id',
'port_id',
'ip_address',
)
# Properties
#: Timestamp at which the NDP proxy was created.

View File

@@ -28,7 +28,9 @@ class Network(_base.NetworkResource, tag.TagMixin):
# NOTE: We don't support query on list or datetime fields yet
_query_mapping = resource.QueryParameters(
'description', 'name', 'status',
'description',
'name',
'status',
'project_id',
ipv4_address_scope_id='ipv4_address_scope',
ipv6_address_scope_id='ipv6_address_scope',
@@ -68,13 +70,14 @@ class Network(_base.NetworkResource, tag.TagMixin):
#: The port security status, which is enabled ``True`` or disabled
#: ``False``. *Type: bool* *Default: False*
#: Available for multiple provider extensions.
is_port_security_enabled = resource.Body('port_security_enabled',
type=bool,
default=False)
is_port_security_enabled = resource.Body(
'port_security_enabled', type=bool, default=False
)
#: Whether or not the router is external.
#: *Type: bool* *Default: False*
is_router_external = resource.Body('router:external', type=bool,
default=False)
is_router_external = resource.Body(
'router:external', type=bool, default=False
)
#: Indicates whether this network is shared across all tenants.
#: By default, only administrative users can change this value.
#: *Type: bool*

View File

@@ -29,8 +29,10 @@ class NetworkIPAvailability(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'ip_version', 'network_id', 'network_name',
'project_id'
'ip_version',
'network_id',
'network_name',
'project_id',
)
# Properties

View File

@@ -31,9 +31,16 @@ class NetworkSegmentRange(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'name', 'default', 'shared', 'project_id',
'network_type', 'physical_network', 'minimum', 'maximum',
'used', 'available'
'name',
'default',
'shared',
'project_id',
'network_type',
'physical_network',
'minimum',
'maximum',
'used',
'available',
)
# Properties

View File

@@ -28,8 +28,14 @@ class Pool(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'lb_algorithm', 'name',
'protocol', 'provider', 'subnet_id', 'virtual_ip_id', 'listener_id',
'description',
'lb_algorithm',
'name',
'protocol',
'provider',
'subnet_id',
'virtual_ip_id',
'listener_id',
'project_id',
is_admin_state_up='admin_state_up',
load_balancer_id='loadbalancer_id',

View File

@@ -28,7 +28,11 @@ class PoolMember(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'address', 'name', 'protocol_port', 'subnet_id', 'weight',
'address',
'name',
'protocol_port',
'subnet_id',
'weight',
'project_id',
is_admin_state_up='admin_state_up',
)

View File

@@ -30,11 +30,25 @@ class Port(_base.NetworkResource, tag.TagMixin):
# NOTE: we skip query on list or datetime fields for now
_query_mapping = resource.QueryParameters(
'binding:host_id', 'binding:profile', 'binding:vif_details',
'binding:vif_type', 'binding:vnic_type',
'description', 'device_id', 'device_owner', 'fields', 'fixed_ips',
'id', 'ip_address', 'mac_address', 'name', 'network_id', 'status',
'subnet_id', 'project_id', 'security_groups',
'binding:host_id',
'binding:profile',
'binding:vif_details',
'binding:vif_type',
'binding:vnic_type',
'description',
'device_id',
'device_owner',
'fields',
'fixed_ips',
'id',
'ip_address',
'mac_address',
'name',
'network_id',
'status',
'subnet_id',
'project_id',
'security_groups',
is_admin_state_up='admin_state_up',
is_port_security_enabled='port_security_enabled',
security_group_ids='security_groups',
@@ -44,8 +58,9 @@ class Port(_base.NetworkResource, tag.TagMixin):
# Properties
#: Allowed address pairs list. Dictionary key ``ip_address`` is required
#: and key ``mac_address`` is optional.
allowed_address_pairs: List[dict] = resource.Body('allowed_address_pairs',
type=list)
allowed_address_pairs: List[dict] = resource.Body(
'allowed_address_pairs', type=list
)
#: The ID of the host where the port is allocated. In some cases,
#: different implementations can run on different hosts.
binding_host_id = resource.Body('binding:host_id')
@@ -104,8 +119,9 @@ class Port(_base.NetworkResource, tag.TagMixin):
is_admin_state_up = resource.Body('admin_state_up', type=bool)
#: The port security status, which is enabled ``True`` or disabled
#: ``False``. *Type: bool* *Default: False*
is_port_security_enabled = resource.Body('port_security_enabled',
type=bool, default=False)
is_port_security_enabled = resource.Body(
'port_security_enabled', type=bool, default=False
)
#: The MAC address of an allowed address pair.
mac_address = resource.Body('mac_address')
#: The port name.
@@ -120,8 +136,9 @@ class Port(_base.NetworkResource, tag.TagMixin):
#: Tenant_id (deprecated attribute).
tenant_id = resource.Body('tenant_id', deprecated=True)
#: Whether to propagate uplink status of the port. *Type: bool*
propagate_uplink_status = resource.Body('propagate_uplink_status',
type=bool)
propagate_uplink_status = resource.Body(
'propagate_uplink_status', type=bool
)
#: Read-only. The ID of the QoS policy attached to the network where the
# port is bound.
qos_network_policy_id = resource.Body('qos_network_policy_id')

View File

@@ -30,7 +30,9 @@ class PortForwarding(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'internal_port_id', 'external_port', 'protocol'
'internal_port_id',
'external_port',
'protocol',
)
# Properties

View File

@@ -29,7 +29,9 @@ class QoSPolicy(resource.Resource, tag.TagMixin):
allow_list = True
_query_mapping = resource.QueryParameters(
'name', 'description', 'is_default',
'name',
'description',
'is_default',
'project_id',
is_shared='shared',
**tag.TagMixin._tag_query_parameters

View File

@@ -28,7 +28,11 @@ class QoSRuleType(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'type', 'drivers', 'all_rules', 'all_supported')
'type',
'drivers',
'all_rules',
'all_supported',
)
# Properties
#: QoS rule type name.

View File

@@ -60,10 +60,12 @@ class Quota(resource.Resource):
#: The maximum amount of security groups you can create. *Type: int*
security_groups = resource.Body('security_group', type=int)
def _prepare_request(self, requires_id=True, prepend_key=False,
base_path=None, **kwargs):
_request = super(Quota, self)._prepare_request(requires_id,
prepend_key)
def _prepare_request(
self, requires_id=True, prepend_key=False, base_path=None, **kwargs
):
_request = super(Quota, self)._prepare_request(
requires_id, prepend_key
)
if self.resource_key in _request.body:
_body = _request.body[self.resource_key]
else:

View File

@@ -28,8 +28,12 @@ class RBACPolicy(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'action', 'object_id', 'object_type', 'project_id',
'target_project_id', target_project_id='target_tenant',
'action',
'object_id',
'object_type',
'project_id',
'target_project_id',
target_project_id='target_tenant',
)
# Properties

View File

@@ -30,7 +30,11 @@ class Router(_base.NetworkResource, tag.TagMixin):
# NOTE: We don't support query on datetime, list or dict fields
_query_mapping = resource.QueryParameters(
'description', 'flavor_id', 'name', 'status', 'project_id',
'description',
'flavor_id',
'name',
'status',
'project_id',
is_admin_state_up='admin_state_up',
is_distributed='distributed',
is_ha='ha',
@@ -40,8 +44,9 @@ class Router(_base.NetworkResource, tag.TagMixin):
# Properties
#: Availability zone hints to use when scheduling the router.
#: *Type: list of availability zone names*
availability_zone_hints = resource.Body('availability_zone_hints',
type=list)
availability_zone_hints = resource.Body(
'availability_zone_hints', type=list
)
#: Availability zones for the router.
#: *Type: list of availability zone names*
availability_zones = resource.Body('availability_zones', type=list)
@@ -155,8 +160,7 @@ class Router(_base.NetworkResource, tag.TagMixin):
:returns: The body of the response as a dictionary.
"""
url = utils.urljoin(self.base_path, self.id,
'add_gateway_router')
url = utils.urljoin(self.base_path, self.id, 'add_gateway_router')
resp = session.put(url, json=body)
return resp.json()
@@ -169,8 +173,7 @@ class Router(_base.NetworkResource, tag.TagMixin):
:returns: The body of the response as a dictionary.
"""
url = utils.urljoin(self.base_path, self.id,
'remove_gateway_router')
url = utils.urljoin(self.base_path, self.id, 'remove_gateway_router')
resp = session.put(url, json=body)
return resp.json()
@@ -188,4 +191,5 @@ class L3AgentRouter(Router):
allow_delete = False
allow_list = True
# NOTE: No query parameter is supported

View File

@@ -27,8 +27,16 @@ class SecurityGroup(_base.NetworkResource, tag.TagMixin):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'fields', 'id', 'name', 'stateful', 'project_id',
'tenant_id', 'revision_number', 'sort_dir', 'sort_key',
'description',
'fields',
'id',
'name',
'stateful',
'project_id',
'tenant_id',
'revision_number',
'sort_dir',
'sort_key',
**tag.TagMixin._tag_query_parameters
)

View File

@@ -27,13 +27,21 @@ class SecurityGroupRule(_base.NetworkResource, tag.TagMixin):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'direction', 'id', 'protocol',
'remote_group_id', 'security_group_id',
'description',
'direction',
'id',
'protocol',
'remote_group_id',
'security_group_id',
'remote_address_group_id',
'port_range_max', 'port_range_min',
'remote_ip_prefix', 'revision_number',
'project_id', 'tenant_id',
'sort_dir', 'sort_key',
'port_range_max',
'port_range_min',
'remote_ip_prefix',
'revision_number',
'project_id',
'tenant_id',
'sort_dir',
'sort_key',
ether_type='ethertype',
**tag.TagMixin._tag_query_parameters
)
@@ -89,7 +97,8 @@ class SecurityGroupRule(_base.NetworkResource, tag.TagMixin):
def _prepare_request(self, *args, **kwargs):
_request = super(SecurityGroupRule, self)._prepare_request(
*args, **kwargs)
*args, **kwargs
)
# Old versions of Neutron do not handle being passed a
# remote_address_group_id and raise and error. Remove it from
# the body if it is blank.

View File

@@ -28,8 +28,12 @@ class Segment(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'name', 'network_id', 'network_type',
'physical_network', 'segmentation_id',
'description',
'name',
'network_id',
'network_type',
'physical_network',
'segmentation_id',
)
# Properties

View File

@@ -28,7 +28,9 @@ class ServiceProfile(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'driver', 'project_id',
'description',
'driver',
'project_id',
is_enabled='enabled',
)
# Properties

View File

@@ -27,8 +27,9 @@ class ServiceProvider(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'service_type', 'name',
is_default='default'
'service_type',
'name',
is_default='default',
)
# Properties

View File

@@ -28,9 +28,17 @@ class Subnet(_base.NetworkResource, tag.TagMixin):
# NOTE: Query on list or datetime fields are currently not supported.
_query_mapping = resource.QueryParameters(
'cidr', 'description', 'gateway_ip', 'ip_version',
'ipv6_address_mode', 'ipv6_ra_mode', 'name', 'network_id',
'segment_id', 'dns_publish_fixed_ip', 'project_id',
'cidr',
'description',
'gateway_ip',
'ip_version',
'ipv6_address_mode',
'ipv6_ra_mode',
'name',
'network_id',
'segment_id',
'dns_publish_fixed_ip',
'project_id',
is_dhcp_enabled='enable_dhcp',
subnet_pool_id='subnetpool_id',
use_default_subnet_pool='use_default_subnetpool',
@@ -87,6 +95,5 @@ class Subnet(_base.NetworkResource, tag.TagMixin):
updated_at = resource.Body('updated_at')
#: Whether to use the default subnet pool to obtain a CIDR.
use_default_subnet_pool = resource.Body(
'use_default_subnetpool',
type=bool
'use_default_subnetpool', type=bool
)

View File

@@ -28,8 +28,12 @@ class SubnetPool(resource.Resource, tag.TagMixin):
allow_list = True
_query_mapping = resource.QueryParameters(
'address_scope_id', 'description', 'ip_version', 'is_default',
'name', 'project_id',
'address_scope_id',
'description',
'ip_version',
'is_default',
'name',
'project_id',
is_shared='shared',
**tag.TagMixin._tag_query_parameters
)

View File

@@ -30,8 +30,10 @@ class TapFlow(resource.Resource):
_allow_unknown_attrs_in_body = True
_query_mapping = resource.QueryParameters(
"sort_key", "sort_dir",
'name', 'project_id'
"sort_key",
"sort_dir",
'name',
'project_id',
)
# Properties

View File

@@ -30,8 +30,7 @@ class TapService(resource.Resource):
_allow_unknown_attrs_in_body = True
_query_mapping = resource.QueryParameters(
"sort_key", "sort_dir",
'name', 'project_id'
"sort_key", "sort_dir", 'name', 'project_id'
)
# Properties

View File

@@ -30,7 +30,11 @@ class Trunk(resource.Resource, tag.TagMixin):
allow_list = True
_query_mapping = resource.QueryParameters(
'name', 'description', 'port_id', 'status', 'sub_ports',
'name',
'description',
'port_id',
'status',
'sub_ports',
'project_id',
is_admin_state_up='admin_state_up',
**tag.TagMixin._tag_query_parameters

View File

@@ -28,8 +28,11 @@ class VpnEndpointGroup(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'name', 'project_id', 'tenant_id',
type='endpoint_type'
'description',
'name',
'project_id',
'tenant_id',
type='endpoint_type',
)
# Properties

View File

@@ -15,6 +15,7 @@ from openstack import resource
class VpnIkePolicy(resource.Resource):
"""VPN IKE policy extension."""
resource_key = 'ikepolicy'
resources_key = 'ikepolicies'
base_path = '/vpn/ikepolicies'
@@ -27,8 +28,14 @@ class VpnIkePolicy(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'auth_algorithm', 'description', 'encryption_algorithm', 'ike_version',
'name', 'pfs', 'project_id', 'phase1_negotiation_mode',
'auth_algorithm',
'description',
'encryption_algorithm',
'ike_version',
'name',
'pfs',
'project_id',
'phase1_negotiation_mode',
)
# Properties

View File

@@ -26,8 +26,13 @@ class VpnIpsecPolicy(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'auth_algorithm', 'description', 'encryption_algorithm', 'name', 'pfs',
'project_id', 'phase1_negotiation_mode',
'auth_algorithm',
'description',
'encryption_algorithm',
'name',
'pfs',
'project_id',
'phase1_negotiation_mode',
)
# Properties

View File

@@ -26,11 +26,24 @@ class VpnIPSecSiteConnection(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'auth_mode', 'description', 'ikepolicy_id', 'ipsecpolicy_id',
'initiator', 'local_ep_group_id', 'peer_address', 'local_id',
'mtu', 'name', 'peer_id', 'project_id', 'psk', 'peer_ep_group_id',
'route_mode', 'vpnservice_id', 'status',
is_admin_state_up='admin_state_up'
'auth_mode',
'description',
'ikepolicy_id',
'ipsecpolicy_id',
'initiator',
'local_ep_group_id',
'peer_address',
'local_id',
'mtu',
'name',
'peer_id',
'project_id',
'psk',
'peer_ep_group_id',
'route_mode',
'vpnservice_id',
'status',
is_admin_state_up='admin_state_up',
)
# Properties

View File

@@ -28,9 +28,15 @@ class VpnService(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'external_v4_ip', 'external_v6_ip', 'name', 'router_id',
'project_id', 'tenant_id', 'subnet_id',
is_admin_state_up='admin_state_up'
'description',
'external_v4_ip',
'external_v6_ip',
'name',
'router_id',
'project_id',
'tenant_id',
'subnet_id',
is_admin_state_up='admin_state_up',
)
# Properties

View File

@@ -46,13 +46,15 @@ class TestAddressGroup(base.BaseFunctionalTest):
def tearDown(self):
sot = self.user_cloud.network.delete_address_group(
self.ADDRESS_GROUP_ID)
self.ADDRESS_GROUP_ID
)
self.assertIsNone(sot)
super(TestAddressGroup, self).tearDown()
def test_find(self):
sot = self.user_cloud.network.find_address_group(
self.ADDRESS_GROUP_NAME)
self.ADDRESS_GROUP_NAME
)
self.assertEqual(self.ADDRESS_GROUP_ID, sot.id)
def test_get(self):

View File

@@ -36,13 +36,15 @@ class TestAddressScope(base.BaseFunctionalTest):
def tearDown(self):
sot = self.user_cloud.network.delete_address_scope(
self.ADDRESS_SCOPE_ID)
self.ADDRESS_SCOPE_ID
)
self.assertIsNone(sot)
super(TestAddressScope, self).tearDown()
def test_find(self):
sot = self.user_cloud.network.find_address_scope(
self.ADDRESS_SCOPE_NAME)
self.ADDRESS_SCOPE_NAME
)
self.assertEqual(self.ADDRESS_SCOPE_ID, sot.id)
def test_get(self):

View File

@@ -54,12 +54,14 @@ class TestAgentNetworks(base.BaseFunctionalTest):
def _verify_add(self, network):
net = self.user_cloud.network.dhcp_agent_hosting_networks(
self.AGENT_ID)
self.AGENT_ID
)
net_ids = [n.id for n in net]
self.assertIn(self.NETWORK_ID, net_ids)
def _verify_remove(self, network):
net = self.user_cloud.network.dhcp_agent_hosting_networks(
self.AGENT_ID)
self.AGENT_ID
)
net_ids = [n.id for n in net]
self.assertNotIn(self.NETWORK_ID, net_ids)

View File

@@ -27,7 +27,8 @@ class TestAgentRouters(base.BaseFunctionalTest):
self.ROUTER_NAME = "router-name-" + self.getUniqueString("router-name")
self.ROUTER = self.user_cloud.network.create_router(
name=self.ROUTER_NAME)
name=self.ROUTER_NAME
)
self.addCleanup(self.user_cloud.network.delete_router, self.ROUTER)
assert isinstance(self.ROUTER, router.Router)
agent_list = list(self.user_cloud.network.agents())
@@ -47,7 +48,8 @@ class TestAgentRouters(base.BaseFunctionalTest):
def test_remove_router_from_agent(self):
self.user_cloud.network.remove_router_from_agent(
self.AGENT, self.ROUTER)
self.AGENT, self.ROUTER
)
rots = self.user_cloud.network.agent_hosted_routers(self.AGENT)
routers = [router.id for router in rots]
self.assertNotIn(self.ROUTER.id, routers)

View File

@@ -32,13 +32,14 @@ class TestAutoAllocatedTopology(base.BaseFunctionalTest):
)
projects = [
o.project_id
for o in self.operator_cloud.network.networks()]
o.project_id for o in self.operator_cloud.network.networks()
]
self.PROJECT_ID = projects[0]
def tearDown(self):
res = self.operator_cloud.network.delete_auto_allocated_topology(
self.PROJECT_ID)
self.PROJECT_ID
)
self.assertIsNone(res)
super(TestAutoAllocatedTopology, self).tearDown()
@@ -63,7 +64,8 @@ class TestAutoAllocatedTopology(base.BaseFunctionalTest):
def test_show_project_option(self):
top = self.operator_cloud.network.get_auto_allocated_topology(
self.PROJECT_ID)
self.PROJECT_ID
)
network = self.operator_cloud.network.get_network(top.id)
self.assertEqual(top.project_id, network.project_id)
self.assertEqual(top.id, network.id)
@@ -73,4 +75,5 @@ class TestAutoAllocatedTopology(base.BaseFunctionalTest):
for network in networks:
if network.name == "public":
self.operator_cloud.network.update_network(
network, is_default=True)
network, is_default=True
)

View File

@@ -16,7 +16,6 @@ from openstack.tests.functional import base
class TestBGPSpeaker(base.BaseFunctionalTest):
def setUp(self):
super().setUp()
self.LOCAL_AS = 101
@@ -30,14 +29,19 @@ class TestBGPSpeaker(base.BaseFunctionalTest):
self.skipTest("Neutron BGP Dynamic Routing Extension disabled")
bgp_speaker = self.operator_cloud.network.create_bgp_speaker(
ip_version=self.IP_VERSION, local_as=self.LOCAL_AS,
name=self.SPEAKER_NAME)
ip_version=self.IP_VERSION,
local_as=self.LOCAL_AS,
name=self.SPEAKER_NAME,
)
assert isinstance(bgp_speaker, _bgp_speaker.BgpSpeaker)
self.SPEAKER = bgp_speaker
bgp_peer = self.operator_cloud.network.create_bgp_peer(
name=self.PEER_NAME, auth_type='none',
remote_as=self.REMOTE_AS, peer_ip=self.PEER_IP)
name=self.PEER_NAME,
auth_type='none',
remote_as=self.REMOTE_AS,
peer_ip=self.PEER_IP,
)
assert isinstance(bgp_peer, _bgp_peer.BgpPeer)
self.PEER = bgp_peer
@@ -62,13 +66,15 @@ class TestBGPSpeaker(base.BaseFunctionalTest):
self.assertEqual(self.LOCAL_AS, sot.local_as)
def test_list_bgp_speakers(self):
speaker_ids = [sp.id for sp in
self.operator_cloud.network.bgp_speakers()]
speaker_ids = [
sp.id for sp in self.operator_cloud.network.bgp_speakers()
]
self.assertIn(self.SPEAKER.id, speaker_ids)
def test_update_bgp_speaker(self):
sot = self.operator_cloud.network.update_bgp_speaker(
self.SPEAKER.id, advertise_floating_ip_host_routes=False)
self.SPEAKER.id, advertise_floating_ip_host_routes=False
)
self.assertFalse(sot.advertise_floating_ip_host_routes)
def test_find_bgp_peer(self):
@@ -88,18 +94,21 @@ class TestBGPSpeaker(base.BaseFunctionalTest):
def test_update_bgp_peer(self):
name = 'new_peer_name' + self.getUniqueString()
sot = self.operator_cloud.network.update_bgp_peer(
self.PEER.id, name=name)
self.PEER.id, name=name
)
self.assertEqual(name, sot.name)
def test_add_remove_peer_to_speaker(self):
self.operator_cloud.network.add_bgp_peer_to_speaker(
self.SPEAKER.id, self.PEER.id)
self.SPEAKER.id, self.PEER.id
)
sot = self.operator_cloud.network.get_bgp_speaker(self.SPEAKER.id)
self.assertEqual([self.PEER.id], sot.peers)
# Remove the peer
self.operator_cloud.network.remove_bgp_peer_from_speaker(
self.SPEAKER.id, self.PEER.id)
self.SPEAKER.id, self.PEER.id
)
sot = self.operator_cloud.network.get_bgp_speaker(self.SPEAKER.id)
self.assertEqual([], sot.peers)
@@ -107,17 +116,20 @@ class TestBGPSpeaker(base.BaseFunctionalTest):
net_name = 'my_network' + self.getUniqueString()
net = self.user_cloud.create_network(name=net_name)
self.operator_cloud.network.add_gateway_network_to_speaker(
self.SPEAKER.id, net.id)
self.SPEAKER.id, net.id
)
sot = self.operator_cloud.network.get_bgp_speaker(self.SPEAKER.id)
self.assertEqual([net.id], sot.networks)
# Remove the network
self.operator_cloud.network.remove_gateway_network_from_speaker(
self.SPEAKER.id, net.id)
self.SPEAKER.id, net.id
)
sot = self.operator_cloud.network.get_bgp_speaker(self.SPEAKER.id)
self.assertEqual([], sot.networks)
def test_get_advertised_routes_of_speaker(self):
sot = self.operator_cloud.network.get_advertised_routes_of_speaker(
self.SPEAKER.id)
self.SPEAKER.id
)
self.assertEqual({'advertised_routes': []}, sot)

View File

@@ -11,11 +11,13 @@
# under the License.
from openstack.network.v2 import bgpvpn as _bgpvpn
from openstack.network.v2 import bgpvpn_network_association as \
_bgpvpn_net_assoc
from openstack.network.v2 import (
bgpvpn_network_association as _bgpvpn_net_assoc,
)
from openstack.network.v2 import bgpvpn_port_association as _bgpvpn_port_assoc
from openstack.network.v2 import bgpvpn_router_association as \
_bgpvpn_router_assoc
from openstack.network.v2 import (
bgpvpn_router_association as _bgpvpn_router_assoc,
)
from openstack.network.v2 import network as _network
from openstack.network.v2 import port as _port
from openstack.network.v2 import router as _router
@@ -24,7 +26,6 @@ from openstack.tests.functional import base
class TestBGPVPN(base.BaseFunctionalTest):
def setUp(self):
super().setUp()
@@ -36,8 +37,8 @@ class TestBGPVPN(base.BaseFunctionalTest):
self.CIDR = "10.101.0.0/24"
self.ROUTE_DISTINGUISHERS = ['64512:1777', '64512:1888', '64512:1999']
self.VNI = 1000
self.ROUTE_TARGETS = '64512:1444',
self.IMPORT_TARGETS = '64512:1555',
self.ROUTE_TARGETS = ('64512:1444',)
self.IMPORT_TARGETS = ('64512:1555',)
self.EXPORT_TARGETS = '64512:1666'
self.TYPE = 'l3'
@@ -66,48 +67,60 @@ class TestBGPVPN(base.BaseFunctionalTest):
self.SUBNET = subnet
port = self.operator_cloud.network.create_port(
name=self.PORT_NAME, network_id=self.NETWORK.id)
name=self.PORT_NAME, network_id=self.NETWORK.id
)
assert isinstance(port, _port.Port)
self.PORT = port
router = self.operator_cloud.network.create_router(
name=self.ROUTER_NAME)
name=self.ROUTER_NAME
)
assert isinstance(router, _router.Router)
self.ROUTER = router
net_assoc = (
self.operator_cloud.network.create_bgpvpn_network_association(
self.BGPVPN, network_id=self.NETWORK.id))
assert isinstance(net_assoc,
_bgpvpn_net_assoc.BgpVpnNetworkAssociation)
self.BGPVPN, network_id=self.NETWORK.id
)
)
assert isinstance(
net_assoc, _bgpvpn_net_assoc.BgpVpnNetworkAssociation
)
self.NET_ASSOC = net_assoc
port_assoc = (
self.operator_cloud.network.create_bgpvpn_port_association(
self.BGPVPN, port_id=self.PORT.id))
assert isinstance(port_assoc,
_bgpvpn_port_assoc.BgpVpnPortAssociation)
self.BGPVPN, port_id=self.PORT.id
)
)
assert isinstance(port_assoc, _bgpvpn_port_assoc.BgpVpnPortAssociation)
self.PORT_ASSOC = port_assoc
router_assoc = (
self.operator_cloud.network.create_bgpvpn_router_association(
self.BGPVPN, router_id=self.ROUTER.id))
assert isinstance(router_assoc,
_bgpvpn_router_assoc.BgpVpnRouterAssociation)
self.BGPVPN, router_id=self.ROUTER.id
)
)
assert isinstance(
router_assoc, _bgpvpn_router_assoc.BgpVpnRouterAssociation
)
self.ROUTER_ASSOC = router_assoc
def tearDown(self):
sot = self.operator_cloud.network.delete_bgpvpn(self.BGPVPN.id)
self.assertIsNone(sot)
sot = self.operator_cloud.network.delete_bgpvpn_network_association(
self.BGPVPN.id, self.NET_ASSOC.id)
self.BGPVPN.id, self.NET_ASSOC.id
)
self.assertIsNone(sot)
sot = self.operator_cloud.network.delete_bgpvpn_port_association(
self.BGPVPN.id, self.PORT_ASSOC.id)
self.BGPVPN.id, self.PORT_ASSOC.id
)
self.assertIsNone(sot)
sot = self.operator_cloud.network.delete_bgpvpn_router_association(
self.BGPVPN.id, self.ROUTER_ASSOC.id)
self.BGPVPN.id, self.ROUTER_ASSOC.id
)
self.assertIsNone(sot)
sot = self.operator_cloud.network.delete_router(self.ROUTER)
@@ -135,47 +148,64 @@ class TestBGPVPN(base.BaseFunctionalTest):
self.assertEqual(list(self.IMPORT_TARGETS), sot.import_targets)
def test_list_bgpvpns(self):
bgpvpn_ids = [bgpvpn.id for bgpvpn in
self.operator_cloud.network.bgpvpns()]
bgpvpn_ids = [
bgpvpn.id for bgpvpn in self.operator_cloud.network.bgpvpns()
]
self.assertIn(self.BGPVPN.id, bgpvpn_ids)
def test_update_bgpvpn(self):
sot = self.operator_cloud.network.update_bgpvpn(
self.BGPVPN.id, import_targets='64512:1333')
self.BGPVPN.id, import_targets='64512:1333'
)
self.assertEqual(['64512:1333'], sot.import_targets)
def test_get_bgpvpnnetwork_association(self):
sot = self.operator_cloud.network.get_bgpvpn_network_association(
self.BGPVPN.id, self.NET_ASSOC.id)
self.BGPVPN.id, self.NET_ASSOC.id
)
self.assertEqual(self.NETWORK.id, sot.network_id)
def test_list_bgpvpn_network_associations(self):
net_assoc_ids = [
net_assoc.id for net_assoc in
self.operator_cloud.network.bgpvpn_network_associations(
self.BGPVPN.id)]
net_assoc.id
for net_assoc in (
self.operator_cloud.network.bgpvpn_network_associations(
self.BGPVPN.id
)
)
]
self.assertIn(self.NET_ASSOC.id, net_assoc_ids)
def test_get_bgpvpn_port_association(self):
sot = self.operator_cloud.network.get_bgpvpn_port_association(
self.BGPVPN.id, self.PORT_ASSOC.id)
self.BGPVPN.id, self.PORT_ASSOC.id
)
self.assertEqual(self.PORT.id, sot.port_id)
def test_list_bgpvpn_port_associations(self):
port_assoc_ids = [
port_assoc.id for port_assoc in
self.operator_cloud.network.bgpvpn_port_associations(
self.BGPVPN.id)]
port_assoc.id
for port_assoc in (
self.operator_cloud.network.bgpvpn_port_associations(
self.BGPVPN.id
)
)
]
self.assertIn(self.PORT_ASSOC.id, port_assoc_ids)
def test_get_bgpvpn_router_association(self):
sot = self.operator_cloud.network.get_bgpvpn_router_association(
self.BGPVPN.id, self.ROUTER_ASSOC.id)
self.BGPVPN.id, self.ROUTER_ASSOC.id
)
self.assertEqual(self.ROUTER.id, sot.router_id)
def test_list_bgpvpn_router_associations(self):
router_assoc_ids = [
router_assoc.id for router_assoc in
self.operator_cloud.network.bgpvpn_router_associations(
self.BGPVPN.id)]
router_assoc.id
for router_assoc in (
self.operator_cloud.network.bgpvpn_router_associations(
self.BGPVPN.id
)
)
]
self.assertIn(self.ROUTER_ASSOC.id, router_assoc_ids)

View File

@@ -31,14 +31,16 @@ class TestDVRRouter(base.BaseFunctionalTest):
self.NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
sot = self.operator_cloud.network.create_router(
name=self.NAME, distributed=True)
name=self.NAME, distributed=True
)
assert isinstance(sot, router.Router)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.operator_cloud.network.delete_router(
self.ID, ignore_missing=False)
self.ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestDVRRouter, self).tearDown()
@@ -60,5 +62,6 @@ class TestDVRRouter(base.BaseFunctionalTest):
def test_update(self):
sot = self.operator_cloud.network.update_router(
self.ID, name=self.UPDATE_NAME)
self.ID, name=self.UPDATE_NAME
)
self.assertEqual(self.UPDATE_NAME, sot.name)

View File

@@ -34,15 +34,18 @@ class TestFirewallPolicyRuleAssociations(base.BaseFunctionalTest):
if not self.user_cloud._has_neutron_extension("fwaas_v2"):
self.skipTest("fwaas_v2 service not supported by cloud")
rul1 = self.user_cloud.network.create_firewall_rule(
name=self.RULE1_NAME)
name=self.RULE1_NAME
)
assert isinstance(rul1, firewall_rule.FirewallRule)
self.assertEqual(self.RULE1_NAME, rul1.name)
rul2 = self.user_cloud.network.create_firewall_rule(
name=self.RULE2_NAME)
name=self.RULE2_NAME
)
assert isinstance(rul2, firewall_rule.FirewallRule)
self.assertEqual(self.RULE2_NAME, rul2.name)
pol = self.user_cloud.network.create_firewall_policy(
name=self.POLICY_NAME)
name=self.POLICY_NAME
)
assert isinstance(pol, firewall_policy.FirewallPolicy)
self.assertEqual(self.POLICY_NAME, pol.name)
self.RULE1_ID = rul1.id

View File

@@ -97,8 +97,7 @@ class TestFlavor(base.BaseFunctionalTest):
)
self.assertIsNotNone(response)
response = self.operator_cloud.network \
.disassociate_flavor_from_service_profile(
self.ID, self.service_profiles.id
)
response = self.operator_cloud.network.disassociate_flavor_from_service_profile( # noqa: E501
self.ID, self.service_profiles.id
)
self.assertIsNone(response)

View File

@@ -95,10 +95,9 @@ class TestFloatingIP(base.BaseFunctionalTest):
fip_args = dict(
floating_network_id=self.EXT_NET_ID,
)
if (
self.user_cloud._has_neutron_extension("dns-integration")
and self.user_cloud.has_service("dns")
):
if self.user_cloud._has_neutron_extension(
"dns-integration"
) and self.user_cloud.has_service("dns"):
self.is_dns_supported = True
fip_args.update(
dict(dns_domain=self.DNS_DOMAIN, dns_name=self.DNS_NAME)

View File

@@ -55,8 +55,8 @@ class TestLocalIP(base.BaseFunctionalTest):
def test_list(self):
names = [
local_ip.name
for local_ip in self.user_cloud.network.local_ips()]
local_ip.name for local_ip in self.user_cloud.network.local_ips()
]
self.assertIn(self.LOCAL_IP_NAME, names)
def test_update(self):

View File

@@ -31,12 +31,13 @@ class TestLocalIPAssociation(base.BaseFunctionalTest):
self.LOCAL_IP_ID = self.getUniqueString()
self.FIXED_PORT_ID = self.getUniqueString()
self.FIXED_IP = self.getUniqueString()
local_ip_association = self.user_cloud.network \
.create_local_ip_association(
local_ip_association = (
self.user_cloud.network.create_local_ip_association(
local_ip=self.LOCAL_IP_ID,
fixed_port_id=self.FIXED_PORT_ID,
fixed_ip=self.FIXED_IP,
)
)
assert isinstance(
local_ip_association, _local_ip_association.LocalIPAssociation
)

View File

@@ -12,7 +12,7 @@
from openstack.network.v2 import (
qos_bandwidth_limit_rule as _qos_bandwidth_limit_rule
qos_bandwidth_limit_rule as _qos_bandwidth_limit_rule,
)
from openstack.tests.functional import base

View File

@@ -12,7 +12,7 @@
from openstack.network.v2 import (
qos_dscp_marking_rule as _qos_dscp_marking_rule
qos_dscp_marking_rule as _qos_dscp_marking_rule,
)
from openstack.tests.functional import base

View File

@@ -12,7 +12,7 @@
from openstack.network.v2 import (
qos_minimum_bandwidth_rule as _qos_minimum_bandwidth_rule
qos_minimum_bandwidth_rule as _qos_minimum_bandwidth_rule,
)
from openstack.tests.functional import base
@@ -35,7 +35,8 @@ class TestQoSMinimumBandwidthRule(base.BaseFunctionalTest):
# Skip the tests if qos-bw-limit-direction extension is not enabled.
if not self.operator_cloud.network.find_extension(
"qos-bw-limit-direction"):
"qos-bw-limit-direction"
):
self.skipTest("Network qos-bw-limit-direction extension disabled")
self.QOS_POLICY_NAME = self.getUniqueString()
@@ -45,12 +46,13 @@ class TestQoSMinimumBandwidthRule(base.BaseFunctionalTest):
shared=self.QOS_IS_SHARED,
)
self.QOS_POLICY_ID = qos_policy.id
qos_min_bw_rule = self.operator_cloud.network \
.create_qos_minimum_bandwidth_rule(
qos_min_bw_rule = (
self.operator_cloud.network.create_qos_minimum_bandwidth_rule(
self.QOS_POLICY_ID,
direction=self.RULE_DIRECTION,
min_kbps=self.RULE_MIN_KBPS,
)
)
assert isinstance(
qos_min_bw_rule,
_qos_minimum_bandwidth_rule.QoSMinimumBandwidthRule,
@@ -64,7 +66,8 @@ class TestQoSMinimumBandwidthRule(base.BaseFunctionalTest):
self.RULE_ID, self.QOS_POLICY_ID
)
qos_policy = self.operator_cloud.network.delete_qos_policy(
self.QOS_POLICY_ID)
self.QOS_POLICY_ID
)
self.assertIsNone(rule)
self.assertIsNone(qos_policy)
super(TestQoSMinimumBandwidthRule, self).tearDown()

View File

@@ -12,7 +12,7 @@
from openstack.network.v2 import (
qos_minimum_packet_rate_rule as _qos_minimum_packet_rate_rule
qos_minimum_packet_rate_rule as _qos_minimum_packet_rate_rule,
)
from openstack.tests.functional import base

View File

@@ -25,12 +25,14 @@ class TestQoSRuleType(base.BaseFunctionalTest):
# Skip the tests if qos-rule-type-details extension is not enabled.
if not self.operator_cloud.network.find_extension(
"qos-rule-type-details"):
"qos-rule-type-details"
):
self.skipTest("Network qos-rule-type-details extension disabled")
def test_find(self):
sot = self.operator_cloud.network.find_qos_rule_type(
self.QOS_RULE_TYPE)
self.QOS_RULE_TYPE
)
self.assertEqual(self.QOS_RULE_TYPE, sot.type)
self.assertIsInstance(sot.drivers, list)

View File

@@ -55,18 +55,20 @@ class TestServiceProfile(base.BaseFunctionalTest):
def test_find(self):
self.user_cloud.network.find_service_profile(
name_or_id="not_existing",
ignore_missing=True)
name_or_id="not_existing", ignore_missing=True
)
if self.operator_cloud and self.ID:
service_profiles = self.operator_cloud.network \
.find_service_profile(self.ID)
service_profiles = (
self.operator_cloud.network.find_service_profile(self.ID)
)
self.assertEqual(self.METAINFO, service_profiles.meta_info)
def test_get(self):
if not self.ID:
self.skipTest("ServiceProfile was not created")
service_profiles = self.operator_cloud.network.get_service_profile(
self.ID)
self.ID
)
self.assertEqual(self.METAINFO, service_profiles.meta_info)
self.assertEqual(
self.SERVICE_PROFILE_DESCRIPTION, service_profiles.description
@@ -86,8 +88,8 @@ class TestServiceProfile(base.BaseFunctionalTest):
# Test as operator
if self.operator_cloud:
metainfos = [
f.meta_info for f in
self.operator_cloud.network.service_profiles()
f.meta_info
for f in self.operator_cloud.network.service_profiles()
]
if self.ID:
self.assertIn(self.METAINFO, metainfos)

View File

@@ -18,7 +18,6 @@ from openstack.tests.functional import base
class TestTapService(base.BaseFunctionalTest):
def setUp(self):
super().setUp()
if not self.user_cloud.network.find_extension("taas"):
@@ -35,32 +34,38 @@ class TestTapService(base.BaseFunctionalTest):
self.FLOW_NET_ID = net.id
port = self.user_cloud.network.create_port(
network_id=self.SERVICE_NET_ID)
network_id=self.SERVICE_NET_ID
)
assert isinstance(port, _port.Port)
self.SERVICE_PORT_ID = port.id
port = self.user_cloud.network.create_port(
network_id=self.FLOW_NET_ID)
port = self.user_cloud.network.create_port(network_id=self.FLOW_NET_ID)
assert isinstance(port, _port.Port)
self.FLOW_PORT_ID = port.id
tap_service = self.user_cloud.network.create_tap_service(
name=self.TAP_S_NAME, port_id=self.SERVICE_PORT_ID)
name=self.TAP_S_NAME, port_id=self.SERVICE_PORT_ID
)
assert isinstance(tap_service, _tap_service.TapService)
self.TAP_SERVICE = tap_service
tap_flow = self.user_cloud.network.create_tap_flow(
name=self.TAP_F_NAME, tap_service_id=self.TAP_SERVICE.id,
source_port=self.FLOW_PORT_ID, direction='BOTH')
name=self.TAP_F_NAME,
tap_service_id=self.TAP_SERVICE.id,
source_port=self.FLOW_PORT_ID,
direction='BOTH',
)
assert isinstance(tap_flow, _tap_flow.TapFlow)
self.TAP_FLOW = tap_flow
def tearDown(self):
sot = self.user_cloud.network.delete_tap_flow(self.TAP_FLOW.id,
ignore_missing=False)
sot = self.user_cloud.network.delete_tap_flow(
self.TAP_FLOW.id, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.user_cloud.network.delete_tap_service(self.TAP_SERVICE.id,
ignore_missing=False)
sot = self.user_cloud.network.delete_tap_service(
self.TAP_SERVICE.id, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.user_cloud.network.delete_port(self.SERVICE_PORT_ID)
self.assertIsNone(sot)
@@ -83,14 +88,16 @@ class TestTapService(base.BaseFunctionalTest):
self.assertEqual(self.TAP_S_NAME, sot.name)
def test_list_tap_services(self):
tap_service_ids = [ts.id for ts in
self.user_cloud.network.tap_services()]
tap_service_ids = [
ts.id for ts in self.user_cloud.network.tap_services()
]
self.assertIn(self.TAP_SERVICE.id, tap_service_ids)
def test_update_tap_service(self):
description = 'My tap service'
sot = self.user_cloud.network.update_tap_service(
self.TAP_SERVICE.id, description=description)
self.TAP_SERVICE.id, description=description
)
self.assertEqual(description, sot.description)
def test_find_tap_flow(self):
@@ -108,12 +115,12 @@ class TestTapService(base.BaseFunctionalTest):
self.assertEqual('BOTH', sot.direction)
def test_list_tap_flows(self):
tap_flow_ids = [tf.id for tf in
self.user_cloud.network.tap_flows()]
tap_flow_ids = [tf.id for tf in self.user_cloud.network.tap_flows()]
self.assertIn(self.TAP_FLOW.id, tap_flow_ids)
def test_update_tap_flow(self):
description = 'My tap flow'
sot = self.user_cloud.network.update_tap_flow(
self.TAP_FLOW.id, description=description)
self.TAP_FLOW.id, description=description
)
self.assertEqual(description, sot.description)

View File

@@ -39,9 +39,7 @@ class TestVpnIkePolicy(base.BaseFunctionalTest):
super(TestVpnIkePolicy, self).tearDown()
def test_list(self):
policies = [
f.name for f in
self.user_cloud.network.vpn_ike_policies()]
policies = [f.name for f in self.user_cloud.network.vpn_ike_policies()]
self.assertIn(self.IKEPOLICY_NAME, policies)
def test_find(self):

View File

@@ -23,7 +23,6 @@ EXAMPLE = {
class TestVersion(base.TestCase):
def test_basic(self):
sot = version.Version()
self.assertEqual('version', sot.resource_key)

View File

@@ -20,12 +20,11 @@ EXAMPLE = {
'name': '1',
'description': '2',
'project_id': '3',
'addresses': ['10.0.0.1/32']
'addresses': ['10.0.0.1/32'],
}
class TestAddressGroup(base.TestCase):
def test_basic(self):
sot = address_group.AddressGroup()
self.assertEqual('address_group', sot.resource_key)
@@ -37,14 +36,18 @@ class TestAddressGroup(base.TestCase):
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"name": "name",
"description": "description",
"project_id": "project_id",
"sort_key": "sort_key",
"sort_dir": "sort_dir",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
self.assertDictEqual(
{
"name": "name",
"description": "description",
"project_id": "project_id",
"sort_key": "sort_key",
"sort_dir": "sort_dir",
"limit": "limit",
"marker": "marker",
},
sot._query_mapping._mapping,
)
def test_make_it(self):
sot = address_group.AddressGroup(**EXAMPLE)

View File

@@ -25,7 +25,6 @@ EXAMPLE = {
class TestAddressScope(base.TestCase):
def test_basic(self):
sot = address_scope.AddressScope()
self.assertEqual('address_scope', sot.resource_key)

View File

@@ -31,12 +31,11 @@ EXAMPLE = {
'resources_synced': False,
'started_at': '2016-07-09T12:14:57.233772',
'topic': 'test-topic',
'ha_state': 'active'
'ha_state': 'active',
}
class TestAgent(base.TestCase):
def test_basic(self):
sot = agent.Agent()
self.assertEqual('agent', sot.resource_key)
@@ -53,8 +52,7 @@ class TestAgent(base.TestCase):
self.assertTrue(sot.is_admin_state_up)
self.assertEqual(EXAMPLE['agent_type'], sot.agent_type)
self.assertTrue(sot.is_alive)
self.assertEqual(EXAMPLE['availability_zone'],
sot.availability_zone)
self.assertEqual(EXAMPLE['availability_zone'], sot.availability_zone)
self.assertEqual(EXAMPLE['binary'], sot.binary)
self.assertEqual(EXAMPLE['configurations'], sot.configuration)
self.assertEqual(EXAMPLE['created_at'], sot.created_at)
@@ -79,8 +77,7 @@ class TestAgent(base.TestCase):
self.assertEqual(response.body, net.add_agent_to_network(sess, **body))
url = 'agents/IDENTIFIER/dhcp-networks'
sess.post.assert_called_with(url,
json=body)
sess.post.assert_called_with(url, json=body)
def test_remove_agent_from_network(self):
# Remove agent from agent
@@ -90,8 +87,9 @@ class TestAgent(base.TestCase):
self.assertIsNone(net.remove_agent_from_network(sess, network_id))
body = {'network_id': {}}
sess.delete.assert_called_with('agents/IDENTIFIER/dhcp-networks/',
json=body)
sess.delete.assert_called_with(
'agents/IDENTIFIER/dhcp-networks/', json=body
)
def test_add_router_to_agent(self):
# Add router to agent
@@ -102,12 +100,12 @@ class TestAgent(base.TestCase):
sess = mock.Mock()
sess.post = mock.Mock(return_value=response)
router_id = '1'
self.assertEqual(response.body,
sot.add_router_to_agent(sess, router_id))
self.assertEqual(
response.body, sot.add_router_to_agent(sess, router_id)
)
body = {'router_id': router_id}
url = 'agents/IDENTIFIER/l3-routers'
sess.post.assert_called_with(url,
json=body)
sess.post.assert_called_with(url, json=body)
def test_remove_router_from_agent(self):
# Remove router from agent
@@ -117,17 +115,16 @@ class TestAgent(base.TestCase):
self.assertIsNone(sot.remove_router_from_agent(sess, router_id))
body = {'router_id': {}}
sess.delete.assert_called_with('agents/IDENTIFIER/l3-routers/',
json=body)
sess.delete.assert_called_with(
'agents/IDENTIFIER/l3-routers/', json=body
)
def test_get_bgp_speakers_hosted_by_dragent(self):
sot = agent.Agent(**EXAMPLE)
sess = mock.Mock()
response = mock.Mock()
response.body = {
'bgp_speakers': [
{'name': 'bgp_speaker_1', 'ip_version': 4}
]
'bgp_speakers': [{'name': 'bgp_speaker_1', 'ip_version': 4}]
}
response.json = mock.Mock(return_value=response.body)
response.status_code = 200
@@ -139,7 +136,6 @@ class TestAgent(base.TestCase):
class TestNetworkHostingDHCPAgent(base.TestCase):
def test_basic(self):
net = agent.NetworkHostingDHCPAgent()
self.assertEqual('agent', net.resource_key)
@@ -154,7 +150,6 @@ class TestNetworkHostingDHCPAgent(base.TestCase):
class TestRouterL3Agent(base.TestCase):
def test_basic(self):
sot = agent.RouterL3Agent()
self.assertEqual('agent', sot.resource_key)

View File

@@ -21,7 +21,6 @@ EXAMPLE = {
class TestAutoAllocatedTopology(base.TestCase):
def test_basic(self):
topo = auto_allocated_topology.AutoAllocatedTopology
self.assertEqual('auto_allocated_topology', topo.resource_key)

View File

@@ -24,7 +24,6 @@ EXAMPLE = {
class TestAvailabilityZone(base.TestCase):
def test_basic(self):
sot = availability_zone.AvailabilityZone()
self.assertEqual('availability_zone', sot.resource_key)

View File

@@ -21,12 +21,11 @@ EXAMPLE = {
'name': 'bgp-peer',
'peer_ip': '10.0.0.3',
'id': IDENTIFIER,
'project_id': '42'
'project_id': '42',
}
class TestBgpPeer(base.TestCase):
def test_basic(self):
sot = bgp_peer.BgpPeer()
self.assertEqual('bgp_peer', sot.resource_key)
@@ -48,7 +47,9 @@ class TestBgpPeer(base.TestCase):
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping,
)

View File

@@ -26,7 +26,7 @@ EXAMPLE = {
'advertise_tenant_networks': 'true',
'local_as': 1000,
'networks': [],
'project_id': '42'
'project_id': '42',
}
@@ -47,17 +47,21 @@ class TestBgpSpeaker(base.TestCase):
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['ip_version'], sot.ip_version)
self.assertEqual(EXAMPLE['advertise_floating_ip_host_routes'],
sot.advertise_floating_ip_host_routes)
self.assertEqual(
EXAMPLE['advertise_floating_ip_host_routes'],
sot.advertise_floating_ip_host_routes,
)
self.assertEqual(EXAMPLE['local_as'], sot.local_as)
self.assertEqual(EXAMPLE['networks'], sot.networks)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping,
)
def test_add_bgp_peer(self):
sot = bgp_speaker.BgpSpeaker(**EXAMPLE)
@@ -143,9 +147,7 @@ class TestBgpSpeaker(base.TestCase):
sot = bgp_speaker.BgpSpeaker(**EXAMPLE)
response = mock.Mock()
response.body = {
'agents': [
{'binary': 'neutron-bgp-dragent', 'alive': True}
]
'agents': [{'binary': 'neutron-bgp-dragent', 'alive': True}]
}
response.json = mock.Mock(return_value=response.body)
response.status_code = 200

View File

@@ -35,7 +35,6 @@ EXAMPLE = {
class TestBgpVpn(base.TestCase):
def test_basic(self):
sot = bgpvpn.BgpVpn()
self.assertEqual('bgpvpn', sot.resource_key)
@@ -52,38 +51,37 @@ class TestBgpVpn(base.TestCase):
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertEqual(EXAMPLE['route_distinguishers'],
sot.route_distinguishers)
self.assertEqual(
EXAMPLE['route_distinguishers'], sot.route_distinguishers
)
self.assertEqual(EXAMPLE['route_targets'], sot.route_targets)
self.assertEqual(EXAMPLE['import_targets'], sot.import_targets)
self.assertEqual(EXAMPLE['export_targets'], sot.export_targets)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping,
)
def test_create_bgpvpn_network_association(self):
test_bpgvpn = bgpvpn.BgpVpn(**EXAMPLE)
test_net = network.Network(**{'name': 'foo_net', 'id': NET_ID})
sot = bgpvpn_network_association.BgpVpnNetworkAssociation(
bgpvn_id=test_bpgvpn.id,
network_id=test_net.id
bgpvn_id=test_bpgvpn.id, network_id=test_net.id
)
self.assertEqual(test_net.id, sot.network_id)
self.assertEqual(test_bpgvpn.id, sot.bgpvn_id)
def test_create_bgpvpn_port_association(self):
test_bpgvpn = bgpvpn.BgpVpn(**EXAMPLE)
test_port = port.Port(**{
'name': 'foo_port',
'id': PORT_ID,
'network_id': NET_ID
})
test_port = port.Port(
**{'name': 'foo_port', 'id': PORT_ID, 'network_id': NET_ID}
)
sot = bgpvpn_port_association.BgpVpnPortAssociation(
bgpvn_id=test_bpgvpn.id,
port_id=test_port.id
bgpvn_id=test_bpgvpn.id, port_id=test_port.id
)
self.assertEqual(test_port.id, sot.port_id)
self.assertEqual(test_bpgvpn.id, sot.bgpvn_id)
@@ -92,8 +90,7 @@ class TestBgpVpn(base.TestCase):
test_bpgvpn = bgpvpn.BgpVpn(**EXAMPLE)
test_router = router.Router(**{'name': 'foo_port'})
sot = bgpvpn_router_association.BgpVpnRouterAssociation(
bgpvn_id=test_bpgvpn.id,
router_id=test_router.id
bgpvn_id=test_bpgvpn.id, router_id=test_router.id
)
self.assertEqual(test_router.id, sot.router_id)
self.assertEqual(test_bpgvpn.id, sot.bgpvn_id)

View File

@@ -25,7 +25,6 @@ EXAMPLE = {
class TestExtension(base.TestCase):
def test_basic(self):
sot = extension.Extension()
self.assertEqual('extension', sot.resource_key)

View File

@@ -32,7 +32,6 @@ EXAMPLE = {
class TestFirewallGroup(testtools.TestCase):
def test_basic(self):
sot = firewall_group.FirewallGroup()
self.assertEqual('firewall_group', sot.resource_key)
@@ -48,10 +47,13 @@ class TestFirewallGroup(testtools.TestCase):
sot = firewall_group.FirewallGroup(**EXAMPLE)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['egress_firewall_policy_id'],
sot.egress_firewall_policy_id)
self.assertEqual(EXAMPLE['ingress_firewall_policy_id'],
sot.ingress_firewall_policy_id)
self.assertEqual(
EXAMPLE['egress_firewall_policy_id'], sot.egress_firewall_policy_id
)
self.assertEqual(
EXAMPLE['ingress_firewall_policy_id'],
sot.ingress_firewall_policy_id,
)
self.assertEqual(EXAMPLE['shared'], sot.shared)
self.assertEqual(EXAMPLE['status'], sot.status)
self.assertEqual(list, type(sot.ports))

View File

@@ -21,15 +21,16 @@ from openstack.network.v2 import firewall_policy
EXAMPLE = {
'description': '1',
'name': '2',
'firewall_rules': ['a30b0ec2-a468-4b1c-8dbf-928ded2a57a8',
'8d562e98-24f3-46e1-bbf3-d9347c0a67ee'],
'firewall_rules': [
'a30b0ec2-a468-4b1c-8dbf-928ded2a57a8',
'8d562e98-24f3-46e1-bbf3-d9347c0a67ee',
],
'shared': True,
'project_id': '4',
}
class TestFirewallPolicy(testtools.TestCase):
def test_basic(self):
sot = firewall_policy.FirewallPolicy()
self.assertEqual('firewall_policy', sot.resource_key)

View File

@@ -34,7 +34,6 @@ EXAMPLE = {
class TestFirewallRule(testtools.TestCase):
def test_basic(self):
sot = firewall_rule.FirewallRule()
self.assertEqual('firewall_rule', sot.resource_key)
@@ -50,15 +49,15 @@ class TestFirewallRule(testtools.TestCase):
sot = firewall_rule.FirewallRule(**EXAMPLE)
self.assertEqual(EXAMPLE['action'], sot.action)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['destination_ip_address'],
sot.destination_ip_address)
self.assertEqual(
EXAMPLE['destination_ip_address'], sot.destination_ip_address
)
self.assertEqual(EXAMPLE['destination_port'], sot.destination_port)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['enabled'], sot.enabled)
self.assertEqual(EXAMPLE['ip_version'], sot.ip_version)
self.assertEqual(EXAMPLE['protocol'], sot.protocol)
self.assertEqual(EXAMPLE['shared'], sot.shared)
self.assertEqual(EXAMPLE['source_ip_address'],
sot.source_ip_address)
self.assertEqual(EXAMPLE['source_ip_address'], sot.source_ip_address)
self.assertEqual(EXAMPLE['source_port'], sot.source_port)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)

View File

@@ -52,13 +52,17 @@ class TestFlavor(base.TestCase):
def test_make_it_with_optional(self):
flavors = flavor.Flavor(**EXAMPLE_WITH_OPTIONAL)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['name'], flavors.name)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['service_type'],
flavors.service_type)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['description'],
flavors.description)
self.assertEqual(
EXAMPLE_WITH_OPTIONAL['service_type'], flavors.service_type
)
self.assertEqual(
EXAMPLE_WITH_OPTIONAL['description'], flavors.description
)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['enabled'], flavors.is_enabled)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['service_profiles'],
flavors.service_profile_ids)
self.assertEqual(
EXAMPLE_WITH_OPTIONAL['service_profiles'],
flavors.service_profile_ids,
)
def test_associate_flavor_with_service_profile(self):
flav = flavor.Flavor(EXAMPLE)
@@ -71,12 +75,12 @@ class TestFlavor(base.TestCase):
sess.post = mock.Mock(return_value=response)
flav.id = 'IDENTIFIER'
self.assertEqual(
response.body, flav.associate_flavor_with_service_profile(
sess, '1'))
response.body,
flav.associate_flavor_with_service_profile(sess, '1'),
)
url = 'flavors/IDENTIFIER/service_profiles'
sess.post.assert_called_with(url,
json=response.body)
sess.post.assert_called_with(url, json=response.body)
def test_disassociate_flavor_from_service_profile(self):
flav = flavor.Flavor(EXAMPLE)
@@ -86,8 +90,10 @@ class TestFlavor(base.TestCase):
sess.post = mock.Mock(return_value=response)
flav.id = 'IDENTIFIER'
self.assertEqual(
None, flav.disassociate_flavor_from_service_profile(
sess, '1'))
None, flav.disassociate_flavor_from_service_profile(sess, '1')
)
url = 'flavors/IDENTIFIER/service_profiles/1'
sess.delete.assert_called_with(url,)
sess.delete.assert_called_with(
url,
)

View File

@@ -34,12 +34,11 @@ EXAMPLE = {
'revision_number': 12,
'updated_at': '13',
'subnet_id': '14',
'tags': ['15', '16']
'tags': ['15', '16'],
}
class TestFloatingIP(base.TestCase):
def test_basic(self):
sot = floating_ip.FloatingIP()
self.assertEqual('floatingip', sot.resource_key)
@@ -55,10 +54,12 @@ class TestFloatingIP(base.TestCase):
sot = floating_ip.FloatingIP(**EXAMPLE)
self.assertEqual(EXAMPLE['created_at'], sot.created_at)
self.assertEqual(EXAMPLE['fixed_ip_address'], sot.fixed_ip_address)
self.assertEqual(EXAMPLE['floating_ip_address'],
sot.floating_ip_address)
self.assertEqual(EXAMPLE['floating_network_id'],
sot.floating_network_id)
self.assertEqual(
EXAMPLE['floating_ip_address'], sot.floating_ip_address
)
self.assertEqual(
EXAMPLE['floating_network_id'], sot.floating_network_id
)
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['port_id'], sot.port_id)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
@@ -73,24 +74,26 @@ class TestFloatingIP(base.TestCase):
self.assertEqual(EXAMPLE['tags'], sot.tags)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'description': 'description',
'project_id': 'project_id',
'tenant_id': 'project_id',
'status': 'status',
'port_id': 'port_id',
'subnet_id': 'subnet_id',
'router_id': 'router_id',
'fixed_ip_address': 'fixed_ip_address',
'floating_ip_address': 'floating_ip_address',
'floating_network_id': 'floating_network_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
},
sot._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'description': 'description',
'project_id': 'project_id',
'tenant_id': 'project_id',
'status': 'status',
'port_id': 'port_id',
'subnet_id': 'subnet_id',
'router_id': 'router_id',
'fixed_ip_address': 'fixed_ip_address',
'floating_ip_address': 'floating_ip_address',
'floating_network_id': 'floating_network_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
},
sot._query_mapping._mapping,
)
def test_find_available(self):
mock_session = mock.Mock(spec=proxy.Proxy)
@@ -111,7 +114,8 @@ class TestFloatingIP(base.TestCase):
floating_ip.FloatingIP.base_path,
headers={'Accept': 'application/json'},
params={},
microversion=None)
microversion=None,
)
def test_find_available_nada(self):
mock_session = mock.Mock(spec=proxy.Proxy)

View File

@@ -33,7 +33,6 @@ EXAMPLE = {
class TestHealthMonitor(base.TestCase):
def test_basic(self):
sot = health_monitor.HealthMonitor()
self.assertEqual('healthmonitor', sot.resource_key)

View File

@@ -18,19 +18,18 @@ EXAMPLE = {
'id': 'ct_helper_id',
'protocol': 'udp',
'port': 69,
'helper': 'tftp'
'helper': 'tftp',
}
class TestL3ConntrackHelper(base.TestCase):
def test_basic(self):
sot = l3_conntrack_helper.ConntrackHelper()
self.assertEqual('conntrack_helper', sot.resource_key)
self.assertEqual('conntrack_helpers', sot.resources_key)
self.assertEqual(
'/routers/%(router_id)s/conntrack_helpers',
sot.base_path)
'/routers/%(router_id)s/conntrack_helpers', sot.base_path
)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)

View File

@@ -33,7 +33,6 @@ EXAMPLE = {
class TestListener(base.TestCase):
def test_basic(self):
sot = listener.Listener()
self.assertEqual('listener', sot.resource_key)
@@ -58,7 +57,7 @@ class TestListener(base.TestCase):
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertEqual(EXAMPLE['protocol'], sot.protocol)
self.assertEqual(EXAMPLE['protocol_port'], sot.protocol_port)
self.assertEqual(EXAMPLE['default_tls_container_ref'],
sot.default_tls_container_ref)
self.assertEqual(EXAMPLE['sni_container_refs'],
sot.sni_container_refs)
self.assertEqual(
EXAMPLE['default_tls_container_ref'], sot.default_tls_container_ref
)
self.assertEqual(EXAMPLE['sni_container_refs'], sot.sni_container_refs)

View File

@@ -33,7 +33,6 @@ EXAMPLE = {
class TestLoadBalancer(base.TestCase):
def test_basic(self):
sot = load_balancer.LoadBalancer()
self.assertEqual('loadbalancer', sot.resource_key)
@@ -53,8 +52,9 @@ class TestLoadBalancer(base.TestCase):
self.assertEqual(EXAMPLE['listeners'], sot.listener_ids)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['operating_status'], sot.operating_status)
self.assertEqual(EXAMPLE['provisioning_status'],
sot.provisioning_status)
self.assertEqual(
EXAMPLE['provisioning_status'], sot.provisioning_status
)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertEqual(EXAMPLE['vip_address'], sot.vip_address)
self.assertEqual(EXAMPLE['vip_subnet_id'], sot.vip_subnet_id)

View File

@@ -33,7 +33,6 @@ EXAMPLE = {
class TestLocalIP(base.TestCase):
def test_basic(self):
sot = local_ip.LocalIP()
self.assertEqual('local_ip', sot.resource_key)
@@ -45,18 +44,22 @@ class TestLocalIP(base.TestCase):
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"name": "name",
"description": "description",
"project_id": "project_id",
"network_id": "network_id",
"local_port_id": "local_port_id",
"local_ip_address": "local_ip_address",
"ip_mode": "ip_mode",
"sort_key": "sort_key",
"sort_dir": "sort_dir",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
self.assertDictEqual(
{
"name": "name",
"description": "description",
"project_id": "project_id",
"network_id": "network_id",
"local_port_id": "local_port_id",
"local_ip_address": "local_ip_address",
"ip_mode": "ip_mode",
"sort_key": "sort_key",
"sort_dir": "sort_dir",
"limit": "limit",
"marker": "marker",
},
sot._query_mapping._mapping,
)
def test_make_it(self):
sot = local_ip.LocalIP(**EXAMPLE)

View File

@@ -26,13 +26,13 @@ EXAMPLE = {
class TestLocalIP(base.TestCase):
def test_basic(self):
sot = local_ip_association.LocalIPAssociation()
self.assertEqual('port_association', sot.resource_key)
self.assertEqual('port_associations', sot.resources_key)
self.assertEqual('/local_ips/%(local_ip_id)s/port_associations',
sot.base_path)
self.assertEqual(
'/local_ips/%(local_ip_id)s/port_associations', sot.base_path
)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)
@@ -40,12 +40,15 @@ class TestLocalIP(base.TestCase):
self.assertTrue(sot.allow_list)
self.assertDictEqual(
{'fixed_port_id': 'fixed_port_id',
'fixed_ip': 'fixed_ip',
'host': 'host',
'limit': 'limit',
'marker': 'marker'},
sot._query_mapping._mapping)
{
'fixed_port_id': 'fixed_port_id',
'fixed_ip': 'fixed_ip',
'host': 'host',
'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping,
)
def test_make_it(self):
sot = local_ip_association.LocalIPAssociation(**EXAMPLE)

View File

@@ -25,7 +25,6 @@ EXAMPLE = {
class TestMeteringLabel(base.TestCase):
def test_basic(self):
sot = metering_label.MeteringLabel()
self.assertEqual('metering_label', sot.resource_key)

View File

@@ -26,7 +26,6 @@ EXAMPLE = {
class TestMeteringLabelRule(base.TestCase):
def test_basic(self):
sot = metering_label_rule.MeteringLabelRule()
self.assertEqual('metering_label_rule', sot.resource_key)
@@ -57,12 +56,16 @@ class TestMeteringLabelRule(base.TestCase):
self.assertFalse(sot.is_excluded)
self.assertEqual(custom_example['id'], sot.id)
self.assertEqual(
custom_example['metering_label_id'], sot.metering_label_id)
custom_example['metering_label_id'], sot.metering_label_id
)
self.assertEqual(custom_example['project_id'], sot.project_id)
self.assertEqual(
custom_example['remote_ip_prefix'], sot.remote_ip_prefix)
custom_example['remote_ip_prefix'], sot.remote_ip_prefix
)
self.assertEqual(
custom_example['source_ip_prefix'], sot.source_ip_prefix)
custom_example['source_ip_prefix'], sot.source_ip_prefix
)
self.assertEqual(
custom_example['destination_ip_prefix'], sot.destination_ip_prefix)
custom_example['destination_ip_prefix'], sot.destination_ip_prefix
)

View File

@@ -26,7 +26,6 @@ EXAMPLE = {
class TestNDPProxy(base.TestCase):
def test_basic(self):
sot = ndp_proxy.NDPProxy()
self.assertEqual('ndp_proxy', sot.resource_key)

View File

@@ -46,7 +46,6 @@ EXAMPLE = {
class TestNetwork(base.TestCase):
def test_basic(self):
sot = network.Network()
self.assertEqual('network', sot.resource_key)
@@ -61,29 +60,34 @@ class TestNetwork(base.TestCase):
def test_make_it(self):
sot = network.Network(**EXAMPLE)
self.assertTrue(sot.is_admin_state_up)
self.assertEqual(EXAMPLE['availability_zone_hints'],
sot.availability_zone_hints)
self.assertEqual(EXAMPLE['availability_zones'],
sot.availability_zones)
self.assertEqual(
EXAMPLE['availability_zone_hints'], sot.availability_zone_hints
)
self.assertEqual(EXAMPLE['availability_zones'], sot.availability_zones)
self.assertEqual(EXAMPLE['created_at'], sot.created_at)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['dns_domain'], sot.dns_domain)
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['ipv4_address_scope'],
sot.ipv4_address_scope_id)
self.assertEqual(EXAMPLE['ipv6_address_scope'],
sot.ipv6_address_scope_id)
self.assertEqual(
EXAMPLE['ipv4_address_scope'], sot.ipv4_address_scope_id
)
self.assertEqual(
EXAMPLE['ipv6_address_scope'], sot.ipv6_address_scope_id
)
self.assertFalse(sot.is_default)
self.assertEqual(EXAMPLE['mtu'], sot.mtu)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertTrue(sot.is_port_security_enabled)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertEqual(EXAMPLE['provider:network_type'],
sot.provider_network_type)
self.assertEqual(EXAMPLE['provider:physical_network'],
sot.provider_physical_network)
self.assertEqual(EXAMPLE['provider:segmentation_id'],
sot.provider_segmentation_id)
self.assertEqual(
EXAMPLE['provider:network_type'], sot.provider_network_type
)
self.assertEqual(
EXAMPLE['provider:physical_network'], sot.provider_physical_network
)
self.assertEqual(
EXAMPLE['provider:segmentation_id'], sot.provider_segmentation_id
)
self.assertEqual(EXAMPLE['qos_policy_id'], sot.qos_policy_id)
self.assertEqual(EXAMPLE['revision_number'], sot.revision_number)
self.assertTrue(sot.is_router_external)
@@ -95,31 +99,32 @@ class TestNetwork(base.TestCase):
self.assertEqual(EXAMPLE['vlan_transparent'], sot.is_vlan_transparent)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'description': 'description',
'name': 'name',
'project_id': 'project_id',
'status': 'status',
'ipv4_address_scope_id': 'ipv4_address_scope',
'ipv6_address_scope_id': 'ipv6_address_scope',
'is_admin_state_up': 'admin_state_up',
'is_port_security_enabled': 'port_security_enabled',
'is_router_external': 'router:external',
'is_shared': 'shared',
'provider_network_type': 'provider:network_type',
'provider_physical_network': 'provider:physical_network',
'provider_segmentation_id': 'provider:segmentation_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
},
sot._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'description': 'description',
'name': 'name',
'project_id': 'project_id',
'status': 'status',
'ipv4_address_scope_id': 'ipv4_address_scope',
'ipv6_address_scope_id': 'ipv6_address_scope',
'is_admin_state_up': 'admin_state_up',
'is_port_security_enabled': 'port_security_enabled',
'is_router_external': 'router:external',
'is_shared': 'shared',
'provider_network_type': 'provider:network_type',
'provider_physical_network': 'provider:physical_network',
'provider_segmentation_id': 'provider:segmentation_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
},
sot._query_mapping._mapping,
)
class TestDHCPAgentHostingNetwork(base.TestCase):
def test_basic(self):
net = network.DHCPAgentHostingNetwork()
self.assertEqual('network', net.resource_key)

View File

@@ -27,11 +27,16 @@ EXAMPLE = {
EXAMPLE_WITH_OPTIONAL = {
'network_id': IDENTIFIER,
'network_name': 'private',
'subnet_ip_availability': [{"used_ips": 3, "subnet_id":
"2e4db1d6-ab2d-4bb1-93bb-a003fdbc9b39",
"subnet_name": "private-subnet",
"ip_version": 6, "cidr": "fd91:c3ba:e818::/64",
"total_ips": 18446744073709551614}],
'subnet_ip_availability': [
{
"used_ips": 3,
"subnet_id": "2e4db1d6-ab2d-4bb1-93bb-a003fdbc9b39",
"subnet_name": "private-subnet",
"ip_version": 6,
"cidr": "fd91:c3ba:e818::/64",
"total_ips": 18446744073709551614,
}
],
'project_id': '2',
'total_ips': 1844,
'used_ips': 6,
@@ -39,7 +44,6 @@ EXAMPLE_WITH_OPTIONAL = {
class TestNetworkIPAvailability(base.TestCase):
def test_basic(self):
sot = network_ip_availability.NetworkIPAvailability()
self.assertEqual('network_ip_availability', sot.resource_key)
@@ -56,20 +60,25 @@ class TestNetworkIPAvailability(base.TestCase):
sot = network_ip_availability.NetworkIPAvailability(**EXAMPLE)
self.assertEqual(EXAMPLE['network_id'], sot.network_id)
self.assertEqual(EXAMPLE['network_name'], sot.network_name)
self.assertEqual(EXAMPLE['subnet_ip_availability'],
sot.subnet_ip_availability)
self.assertEqual(
EXAMPLE['subnet_ip_availability'], sot.subnet_ip_availability
)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertEqual(EXAMPLE['total_ips'], sot.total_ips)
self.assertEqual(EXAMPLE['used_ips'], sot.used_ips)
def test_make_it_with_optional(self):
sot = network_ip_availability.NetworkIPAvailability(
**EXAMPLE_WITH_OPTIONAL)
**EXAMPLE_WITH_OPTIONAL
)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['network_id'], sot.network_id)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['network_name'],
sot.network_name)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['subnet_ip_availability'],
sot.subnet_ip_availability)
self.assertEqual(
EXAMPLE_WITH_OPTIONAL['network_name'], sot.network_name
)
self.assertEqual(
EXAMPLE_WITH_OPTIONAL['subnet_ip_availability'],
sot.subnet_ip_availability,
)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['project_id'], sot.project_id)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['total_ips'], sot.total_ips)
self.assertEqual(EXAMPLE_WITH_OPTIONAL['used_ips'], sot.used_ips)

View File

@@ -34,12 +34,12 @@ EXAMPLE = {
class TestNetworkSegmentRange(base.TestCase):
def test_basic(self):
test_seg_range = network_segment_range.NetworkSegmentRange()
self.assertEqual('network_segment_range', test_seg_range.resource_key)
self.assertEqual('network_segment_ranges',
test_seg_range.resources_key)
self.assertEqual(
'network_segment_ranges', test_seg_range.resources_key
)
self.assertEqual('/network_segment_ranges', test_seg_range.base_path)
self.assertTrue(test_seg_range.allow_create)
@@ -56,8 +56,9 @@ class TestNetworkSegmentRange(base.TestCase):
self.assertEqual(EXAMPLE['shared'], test_seg_range.shared)
self.assertEqual(EXAMPLE['project_id'], test_seg_range.project_id)
self.assertEqual(EXAMPLE['network_type'], test_seg_range.network_type)
self.assertEqual(EXAMPLE['physical_network'],
test_seg_range.physical_network)
self.assertEqual(
EXAMPLE['physical_network'], test_seg_range.physical_network
)
self.assertEqual(EXAMPLE['minimum'], test_seg_range.minimum)
self.assertEqual(EXAMPLE['maximum'], test_seg_range.maximum)
self.assertEqual(EXAMPLE['used'], test_seg_range.used)

View File

@@ -41,7 +41,6 @@ EXAMPLE = {
class TestPool(base.TestCase):
def test_basic(self):
sot = pool.Pool()
self.assertEqual('pool', sot.resource_key)
@@ -59,8 +58,9 @@ class TestPool(base.TestCase):
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['healthmonitor_id'], sot.health_monitor_id)
self.assertEqual(EXAMPLE['health_monitors'], sot.health_monitor_ids)
self.assertEqual(EXAMPLE['health_monitor_status'],
sot.health_monitor_status)
self.assertEqual(
EXAMPLE['health_monitor_status'], sot.health_monitor_status
)
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['lb_algorithm'], sot.lb_algorithm)
self.assertEqual(EXAMPLE['listeners'], sot.listener_ids)
@@ -70,8 +70,9 @@ class TestPool(base.TestCase):
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertEqual(EXAMPLE['protocol'], sot.protocol)
self.assertEqual(EXAMPLE['provider'], sot.provider)
self.assertEqual(EXAMPLE['session_persistence'],
sot.session_persistence)
self.assertEqual(
EXAMPLE['session_persistence'], sot.session_persistence
)
self.assertEqual(EXAMPLE['status'], sot.status)
self.assertEqual(EXAMPLE['status_description'], sot.status_description)
self.assertEqual(EXAMPLE['subnet_id'], sot.subnet_id)

View File

@@ -29,7 +29,6 @@ EXAMPLE = {
class TestPoolMember(base.TestCase):
def test_basic(self):
sot = pool_member.PoolMember()
self.assertEqual('member', sot.resource_key)

View File

@@ -45,28 +45,32 @@ EXAMPLE = {
'qos_policy_id': '21',
'propagate_uplink_status': False,
'resource_request': {
'required': [
'CUSTOM_PHYSNET_PUBLIC', 'CUSTOM_VNIC_TYPE_NORMAL'],
'required': ['CUSTOM_PHYSNET_PUBLIC', 'CUSTOM_VNIC_TYPE_NORMAL'],
'resources': {
'NET_BW_EGR_KILOBIT_PER_SEC': 1,
'NET_BW_IGR_KILOBIT_PER_SEC': 2}},
'NET_BW_IGR_KILOBIT_PER_SEC': 2,
},
},
'revision_number': 22,
'security_groups': ['23'],
'status': '25',
'project_id': '26',
'trunk_details': {
'trunk_id': '27',
'sub_ports': [{
'port_id': '28',
'segmentation_id': 29,
'segmentation_type': '30',
'mac_address': '31'}]},
'sub_ports': [
{
'port_id': '28',
'segmentation_id': 29,
'segmentation_type': '30',
'mac_address': '31',
}
],
},
'updated_at': '2016-07-09T12:14:57.233772',
}
class TestPort(base.TestCase):
def test_basic(self):
sot = port.Port()
self.assertEqual('port', sot.resource_key)
@@ -78,46 +82,51 @@ class TestPort(base.TestCase):
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"binding:host_id": "binding:host_id",
"binding:profile": "binding:profile",
"binding:vif_details": "binding:vif_details",
"binding:vif_type": "binding:vif_type",
"binding:vnic_type": "binding:vnic_type",
"description": "description",
"device_id": "device_id",
"device_owner": "device_owner",
"fields": "fields",
"fixed_ips": "fixed_ips",
"id": "id",
"ip_address": "ip_address",
"mac_address": "mac_address",
"name": "name",
"network_id": "network_id",
"security_groups": "security_groups",
"status": "status",
"subnet_id": "subnet_id",
"is_admin_state_up": "admin_state_up",
"is_port_security_enabled":
"port_security_enabled",
"project_id": "project_id",
"security_group_ids": "security_groups",
"limit": "limit",
"marker": "marker",
"any_tags": "tags-any",
"not_any_tags": "not-tags-any",
"not_tags": "not-tags",
"tags": "tags"},
sot._query_mapping._mapping)
self.assertDictEqual(
{
"binding:host_id": "binding:host_id",
"binding:profile": "binding:profile",
"binding:vif_details": "binding:vif_details",
"binding:vif_type": "binding:vif_type",
"binding:vnic_type": "binding:vnic_type",
"description": "description",
"device_id": "device_id",
"device_owner": "device_owner",
"fields": "fields",
"fixed_ips": "fixed_ips",
"id": "id",
"ip_address": "ip_address",
"mac_address": "mac_address",
"name": "name",
"network_id": "network_id",
"security_groups": "security_groups",
"status": "status",
"subnet_id": "subnet_id",
"is_admin_state_up": "admin_state_up",
"is_port_security_enabled": "port_security_enabled",
"project_id": "project_id",
"security_group_ids": "security_groups",
"limit": "limit",
"marker": "marker",
"any_tags": "tags-any",
"not_any_tags": "not-tags-any",
"not_tags": "not-tags",
"tags": "tags",
},
sot._query_mapping._mapping,
)
def test_make_it(self):
sot = port.Port(**EXAMPLE)
self.assertTrue(sot.is_admin_state_up)
self.assertEqual(EXAMPLE['allowed_address_pairs'],
sot.allowed_address_pairs)
self.assertEqual(
EXAMPLE['allowed_address_pairs'], sot.allowed_address_pairs
)
self.assertEqual(EXAMPLE['binding:host_id'], sot.binding_host_id)
self.assertEqual(EXAMPLE['binding:profile'], sot.binding_profile)
self.assertEqual(EXAMPLE['binding:vif_details'],
sot.binding_vif_details)
self.assertEqual(
EXAMPLE['binding:vif_details'], sot.binding_vif_details
)
self.assertEqual(EXAMPLE['binding:vif_type'], sot.binding_vif_type)
self.assertEqual(EXAMPLE['binding:vnic_type'], sot.binding_vnic_type)
self.assertEqual(EXAMPLE['created_at'], sot.created_at)
@@ -136,14 +145,17 @@ class TestPort(base.TestCase):
self.assertEqual(EXAMPLE['mac_address'], sot.mac_address)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['network_id'], sot.network_id)
self.assertEqual(EXAMPLE['numa_affinity_policy'],
sot.numa_affinity_policy)
self.assertEqual(
EXAMPLE['numa_affinity_policy'], sot.numa_affinity_policy
)
self.assertTrue(sot.is_port_security_enabled)
self.assertEqual(EXAMPLE['qos_network_policy_id'],
sot.qos_network_policy_id)
self.assertEqual(
EXAMPLE['qos_network_policy_id'], sot.qos_network_policy_id
)
self.assertEqual(EXAMPLE['qos_policy_id'], sot.qos_policy_id)
self.assertEqual(EXAMPLE['propagate_uplink_status'],
sot.propagate_uplink_status)
self.assertEqual(
EXAMPLE['propagate_uplink_status'], sot.propagate_uplink_status
)
self.assertEqual(EXAMPLE['resource_request'], sot.resource_request)
self.assertEqual(EXAMPLE['revision_number'], sot.revision_number)
self.assertEqual(EXAMPLE['security_groups'], sot.security_group_ids)

View File

@@ -22,39 +22,43 @@ EXAMPLE = {
'internal_port': 80,
'internal_port_id': 'internal-port-uuid',
'external_port': 8080,
'description': 'description'
'description': 'description',
}
class TestFloatingIP(base.TestCase):
def test_basic(self):
sot = port_forwarding.PortForwarding()
self.assertEqual('port_forwarding', sot.resource_key)
self.assertEqual('port_forwardings', sot.resources_key)
self.assertEqual(
'/floatingips/%(floatingip_id)s/port_forwardings',
sot.base_path)
'/floatingips/%(floatingip_id)s/port_forwardings', sot.base_path
)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({'internal_port_id': 'internal_port_id',
'external_port': 'external_port',
'limit': 'limit',
'marker': 'marker',
'protocol': 'protocol'},
sot._query_mapping._mapping)
self.assertDictEqual(
{
'internal_port_id': 'internal_port_id',
'external_port': 'external_port',
'limit': 'limit',
'marker': 'marker',
'protocol': 'protocol',
},
sot._query_mapping._mapping,
)
def test_make_it(self):
sot = port_forwarding.PortForwarding(**EXAMPLE)
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['floatingip_id'], sot.floatingip_id)
self.assertEqual(EXAMPLE['protocol'], sot.protocol)
self.assertEqual(EXAMPLE['internal_ip_address'],
sot.internal_ip_address)
self.assertEqual(
EXAMPLE['internal_ip_address'], sot.internal_ip_address
)
self.assertEqual(EXAMPLE['internal_port'], sot.internal_port)
self.assertEqual(EXAMPLE['internal_port_id'], sot.internal_port_id)
self.assertEqual(EXAMPLE['external_port'], sot.external_port)

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More