Blackify openstack.load_balancer

Black used with the '-l 79 -S' flags.

A future change will ignore this commit in git-blame history by adding a
'git-blame-ignore-revs' file.

Change-Id: I4f3f4228b94230d3b2f52bed4e928df67f82017f
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2023-05-05 10:57:04 +01:00
parent f8e42017e7
commit 82c2a53402
28 changed files with 1386 additions and 942 deletions

View File

@@ -12,8 +12,9 @@
from openstack.load_balancer.v2 import amphora as _amphora
from openstack.load_balancer.v2 import availability_zone as _availability_zone
from openstack.load_balancer.v2 import availability_zone_profile as \
_availability_zone_profile
from openstack.load_balancer.v2 import (
availability_zone_profile as _availability_zone_profile,
)
from openstack.load_balancer.v2 import flavor as _flavor
from openstack.load_balancer.v2 import flavor_profile as _flavor_profile
from openstack.load_balancer.v2 import health_monitor as _hm
@@ -33,8 +34,7 @@ class Proxy(proxy.Proxy):
_resource_registry = {
"amphora": _amphora.Amphora,
"availability_zone": _availability_zone.AvailabilityZone,
"availability_zone_profile":
_availability_zone_profile.AvailabilityZoneProfile,
"availability_zone_profile": _availability_zone_profile.AvailabilityZoneProfile, # noqa: E501
"flavor": _flavor.Flavor,
"flavor_profile": _flavor_profile.FlavorProfile,
"health_monitor": _hm.HealthMonitor,
@@ -44,7 +44,7 @@ class Proxy(proxy.Proxy):
"member": _member.Member,
"pool": _pool.Pool,
"provider": _provider.Provider,
"quota": _quota.Quota
"quota": _quota.Quota,
}
def create_load_balancer(self, **attrs):
@@ -82,8 +82,9 @@ class Proxy(proxy.Proxy):
:returns: One
:class:`~openstack.load_balancer.v2.load_balancer.LoadBalancerStats`
"""
return self._get(_lb.LoadBalancerStats, lb_id=load_balancer,
requires_id=False)
return self._get(
_lb.LoadBalancerStats, lb_id=load_balancer, requires_id=False
)
def load_balancers(self, **query):
"""Retrieve a generator of load balancers
@@ -92,8 +93,9 @@ class Proxy(proxy.Proxy):
"""
return self._list(_lb.LoadBalancer, **query)
def delete_load_balancer(self, load_balancer, ignore_missing=True,
cascade=False):
def delete_load_balancer(
self, load_balancer, ignore_missing=True, cascade=False
):
"""Delete a load balancer
:param load_balancer: The load_balancer can be either the ID or a
@@ -111,8 +113,9 @@ class Proxy(proxy.Proxy):
"""
load_balancer = self._get_resource(_lb.LoadBalancer, load_balancer)
load_balancer.cascade = cascade
return self._delete(_lb.LoadBalancer, load_balancer,
ignore_missing=ignore_missing)
return self._delete(
_lb.LoadBalancer, load_balancer, ignore_missing=ignore_missing
)
def find_load_balancer(self, name_or_id, ignore_missing=True):
"""Find a single load balancer
@@ -126,8 +129,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._find(_lb.LoadBalancer, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_lb.LoadBalancer, name_or_id, ignore_missing=ignore_missing
)
def update_load_balancer(self, load_balancer, **attrs):
"""Update a load balancer
@@ -143,8 +147,14 @@ class Proxy(proxy.Proxy):
"""
return self._update(_lb.LoadBalancer, load_balancer, **attrs)
def wait_for_load_balancer(self, name_or_id, status='ACTIVE',
failures=['ERROR'], interval=2, wait=300):
def wait_for_load_balancer(
self,
name_or_id,
status='ACTIVE',
failures=['ERROR'],
interval=2,
wait=300,
):
"""Wait for load balancer status
:param name_or_id: The name or ID of the load balancer.
@@ -167,8 +177,15 @@ class Proxy(proxy.Proxy):
"""
lb = self._find(_lb.LoadBalancer, name_or_id, ignore_missing=False)
return resource.wait_for_status(self, lb, status, failures, interval,
wait, attribute='provisioning_status')
return resource.wait_for_status(
self,
lb,
status,
failures,
interval,
wait,
attribute='provisioning_status',
)
def failover_load_balancer(self, load_balancer, **attrs):
"""Failover a load balancer
@@ -206,8 +223,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_listener.Listener, listener,
ignore_missing=ignore_missing)
self._delete(
_listener.Listener, listener, ignore_missing=ignore_missing
)
def find_listener(self, name_or_id, ignore_missing=True):
"""Find a single listener
@@ -222,8 +240,9 @@ class Proxy(proxy.Proxy):
:returns: One :class:`~openstack.load_balancer.v2.listener.Listener`
or None
"""
return self._find(_listener.Listener, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_listener.Listener, name_or_id, ignore_missing=ignore_missing
)
def get_listener(self, listener):
"""Get a single listener
@@ -250,8 +269,9 @@ class Proxy(proxy.Proxy):
:raises: :class:`~openstack.exceptions.ResourceNotFound` when no
resource can be found.
"""
return self._get(_listener.ListenerStats, listener_id=listener,
requires_id=False)
return self._get(
_listener.ListenerStats, listener_id=listener, requires_id=False
)
def listeners(self, **query):
"""Return a generator of listeners
@@ -322,8 +342,7 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._delete(_pool.Pool, pool,
ignore_missing=ignore_missing)
return self._delete(_pool.Pool, pool, ignore_missing=ignore_missing)
def find_pool(self, name_or_id, ignore_missing=True):
"""Find a single pool
@@ -337,8 +356,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._find(_pool.Pool, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_pool.Pool, name_or_id, ignore_missing=ignore_missing
)
def update_pool(self, pool, **attrs):
"""Update a pool
@@ -368,8 +388,7 @@ class Proxy(proxy.Proxy):
:rtype: :class:`~openstack.load_balancer.v2.member.Member`
"""
poolobj = self._get_resource(_pool.Pool, pool)
return self._create(_member.Member, pool_id=poolobj.id,
**attrs)
return self._create(_member.Member, pool_id=poolobj.id, **attrs)
def delete_member(self, member, pool, ignore_missing=True):
"""Delete a member
@@ -389,8 +408,12 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
poolobj = self._get_resource(_pool.Pool, pool)
self._delete(_member.Member, member,
ignore_missing=ignore_missing, pool_id=poolobj.id)
self._delete(
_member.Member,
member,
ignore_missing=ignore_missing,
pool_id=poolobj.id,
)
def find_member(self, name_or_id, pool, ignore_missing=True):
"""Find a single member
@@ -409,8 +432,12 @@ class Proxy(proxy.Proxy):
or None
"""
poolobj = self._get_resource(_pool.Pool, pool)
return self._find(_member.Member, name_or_id,
ignore_missing=ignore_missing, pool_id=poolobj.id)
return self._find(
_member.Member,
name_or_id,
ignore_missing=ignore_missing,
pool_id=poolobj.id,
)
def get_member(self, member, pool):
"""Get a single member
@@ -427,8 +454,7 @@ class Proxy(proxy.Proxy):
when no resource can be found.
"""
poolobj = self._get_resource(_pool.Pool, pool)
return self._get(_member.Member, member,
pool_id=poolobj.id)
return self._get(_member.Member, member, pool_id=poolobj.id)
def members(self, pool, **query):
"""Return a generator of members
@@ -461,8 +487,9 @@ class Proxy(proxy.Proxy):
:rtype: :class:`~openstack.load_balancer.v2.member.Member`
"""
poolobj = self._get_resource(_pool.Pool, pool)
return self._update(_member.Member, member,
pool_id=poolobj.id, **attrs)
return self._update(
_member.Member, member, pool_id=poolobj.id, **attrs
)
def find_health_monitor(self, name_or_id, ignore_missing=True):
"""Find a single health monitor
@@ -483,8 +510,9 @@ class Proxy(proxy.Proxy):
:raises: :class:`openstack.exceptions.ResourceNotFound` if nothing
is found and ignore_missing is ``False``.
"""
return self._find(_hm.HealthMonitor, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_hm.HealthMonitor, name_or_id, ignore_missing=ignore_missing
)
def create_health_monitor(self, **attrs):
"""Create a new health monitor from attributes
@@ -544,8 +572,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._delete(_hm.HealthMonitor, healthmonitor,
ignore_missing=ignore_missing)
return self._delete(
_hm.HealthMonitor, healthmonitor, ignore_missing=ignore_missing
)
def update_health_monitor(self, healthmonitor, **attrs):
"""Update a health monitor
@@ -561,8 +590,7 @@ class Proxy(proxy.Proxy):
:rtype:
:class:`~openstack.load_balancer.v2.healthmonitor.HealthMonitor`
"""
return self._update(_hm.HealthMonitor, healthmonitor,
**attrs)
return self._update(_hm.HealthMonitor, healthmonitor, **attrs)
def create_l7_policy(self, **attrs):
"""Create a new l7policy from attributes
@@ -589,8 +617,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_l7policy.L7Policy, l7_policy,
ignore_missing=ignore_missing)
self._delete(
_l7policy.L7Policy, l7_policy, ignore_missing=ignore_missing
)
def find_l7_policy(self, name_or_id, ignore_missing=True):
"""Find a single l7policy
@@ -605,8 +634,9 @@ class Proxy(proxy.Proxy):
:returns: One :class:`~openstack.load_balancer.v2.l7_policy.L7Policy`
or None
"""
return self._find(_l7policy.L7Policy, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_l7policy.L7Policy, name_or_id, ignore_missing=ignore_missing
)
def get_l7_policy(self, l7_policy):
"""Get a single l7policy
@@ -660,8 +690,9 @@ class Proxy(proxy.Proxy):
:rtype: :class:`~openstack.load_balancer.v2.l7_rule.L7Rule`
"""
l7policyobj = self._get_resource(_l7policy.L7Policy, l7_policy)
return self._create(_l7rule.L7Rule, l7policy_id=l7policyobj.id,
**attrs)
return self._create(
_l7rule.L7Rule, l7policy_id=l7policyobj.id, **attrs
)
def delete_l7_rule(self, l7rule, l7_policy, ignore_missing=True):
"""Delete a l7rule
@@ -680,8 +711,12 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
l7policyobj = self._get_resource(_l7policy.L7Policy, l7_policy)
self._delete(_l7rule.L7Rule, l7rule, ignore_missing=ignore_missing,
l7policy_id=l7policyobj.id)
self._delete(
_l7rule.L7Rule,
l7rule,
ignore_missing=ignore_missing,
l7policy_id=l7policyobj.id,
)
def find_l7_rule(self, name_or_id, l7_policy, ignore_missing=True):
"""Find a single l7rule
@@ -700,9 +735,12 @@ class Proxy(proxy.Proxy):
or None
"""
l7policyobj = self._get_resource(_l7policy.L7Policy, l7_policy)
return self._find(_l7rule.L7Rule, name_or_id,
ignore_missing=ignore_missing,
l7policy_id=l7policyobj.id)
return self._find(
_l7rule.L7Rule,
name_or_id,
ignore_missing=ignore_missing,
l7policy_id=l7policyobj.id,
)
def get_l7_rule(self, l7rule, l7_policy):
"""Get a single l7rule
@@ -719,8 +757,7 @@ class Proxy(proxy.Proxy):
when no resource can be found.
"""
l7policyobj = self._get_resource(_l7policy.L7Policy, l7_policy)
return self._get(_l7rule.L7Rule, l7rule,
l7policy_id=l7policyobj.id)
return self._get(_l7rule.L7Rule, l7rule, l7policy_id=l7policyobj.id)
def l7_rules(self, l7_policy, **query):
"""Return a generator of l7rules
@@ -753,8 +790,9 @@ class Proxy(proxy.Proxy):
:rtype: :class:`~openstack.load_balancer.v2.l7_rule.L7Rule`
"""
l7policyobj = self._get_resource(_l7policy.L7Policy, l7_policy)
return self._update(_l7rule.L7Rule, l7rule,
l7policy_id=l7policyobj.id, **attrs)
return self._update(
_l7rule.L7Rule, l7rule, l7policy_id=l7policyobj.id, **attrs
)
def quotas(self, **query):
"""Return a generator of quotas
@@ -833,8 +871,9 @@ class Proxy(proxy.Proxy):
:returns: A generator of provider flavor capabilities instances
"""
return self._list(_provider.ProviderFlavorCapabilities,
provider=provider, **query)
return self._list(
_provider.ProviderFlavorCapabilities, provider=provider, **query
)
def create_flavor_profile(self, **attrs):
"""Create a new flavor profile from attributes
@@ -882,8 +921,11 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_flavor_profile.FlavorProfile, flavor_profile,
ignore_missing=ignore_missing)
self._delete(
_flavor_profile.FlavorProfile,
flavor_profile,
ignore_missing=ignore_missing,
)
def find_flavor_profile(self, name_or_id, ignore_missing=True):
"""Find a single flavor profile
@@ -897,8 +939,11 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._find(_flavor_profile.FlavorProfile, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_flavor_profile.FlavorProfile,
name_or_id,
ignore_missing=ignore_missing,
)
def update_flavor_profile(self, flavor_profile, **attrs):
"""Update a flavor profile
@@ -913,8 +958,9 @@ class Proxy(proxy.Proxy):
:rtype:
:class:`~openstack.load_balancer.v2.flavor_profile.FlavorProfile`
"""
return self._update(_flavor_profile.FlavorProfile, flavor_profile,
**attrs)
return self._update(
_flavor_profile.FlavorProfile, flavor_profile, **attrs
)
def create_flavor(self, **attrs):
"""Create a new flavor from attributes
@@ -973,8 +1019,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._find(_flavor.Flavor, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_flavor.Flavor, name_or_id, ignore_missing=ignore_missing
)
def update_flavor(self, flavor, **attrs):
"""Update a flavor
@@ -1019,8 +1066,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._find(_amphora.Amphora, amphora_id,
ignore_missing=ignore_missing)
return self._find(
_amphora.Amphora, amphora_id, ignore_missing=ignore_missing
)
def configure_amphora(self, amphora_id, **attrs):
"""Update the configuration of an amphora agent
@@ -1052,8 +1100,9 @@ class Proxy(proxy.Proxy):
:rtype:
:class:`~openstack.load_balancer.v2.availability_zone_profile.AvailabilityZoneProfile`
"""
return self._create(_availability_zone_profile.AvailabilityZoneProfile,
**attrs)
return self._create(
_availability_zone_profile.AvailabilityZoneProfile, **attrs
)
def get_availability_zone_profile(self, *attrs):
"""Get an availability zone profile
@@ -1066,19 +1115,22 @@ class Proxy(proxy.Proxy):
:returns: One
:class:`~openstack.load_balancer.v2.availability_zone_profile.AvailabilityZoneProfile`
"""
return self._get(_availability_zone_profile.AvailabilityZoneProfile,
*attrs)
return self._get(
_availability_zone_profile.AvailabilityZoneProfile, *attrs
)
def availability_zone_profiles(self, **query):
"""Retrieve a generator of availability zone profiles
:returns: A generator of availability zone profiles instances
"""
return self._list(_availability_zone_profile.AvailabilityZoneProfile,
**query)
return self._list(
_availability_zone_profile.AvailabilityZoneProfile, **query
)
def delete_availability_zone_profile(self, availability_zone_profile,
ignore_missing=True):
def delete_availability_zone_profile(
self, availability_zone_profile, ignore_missing=True
):
"""Delete an availability zone profile
:param availability_zone_profile: The availability_zone_profile can be
@@ -1093,8 +1145,11 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_availability_zone_profile.AvailabilityZoneProfile,
availability_zone_profile, ignore_missing=ignore_missing)
self._delete(
_availability_zone_profile.AvailabilityZoneProfile,
availability_zone_profile,
ignore_missing=ignore_missing,
)
def find_availability_zone_profile(self, name_or_id, ignore_missing=True):
"""Find a single availability zone profile
@@ -1108,11 +1163,15 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._find(_availability_zone_profile.AvailabilityZoneProfile,
name_or_id, ignore_missing=ignore_missing)
return self._find(
_availability_zone_profile.AvailabilityZoneProfile,
name_or_id,
ignore_missing=ignore_missing,
)
def update_availability_zone_profile(self, availability_zone_profile,
**attrs):
def update_availability_zone_profile(
self, availability_zone_profile, **attrs
):
"""Update an availability zone profile
:param availability_zone_profile: The availability_zone_profile can be
@@ -1126,8 +1185,11 @@ class Proxy(proxy.Proxy):
:rtype:
:class:`~openstack.load_balancer.v2.availability_zone_profile.AvailabilityZoneProfile`
"""
return self._update(_availability_zone_profile.AvailabilityZoneProfile,
availability_zone_profile, **attrs)
return self._update(
_availability_zone_profile.AvailabilityZoneProfile,
availability_zone_profile,
**attrs
)
def create_availability_zone(self, **attrs):
"""Create a new availability zone from attributes
@@ -1177,8 +1239,11 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_availability_zone.AvailabilityZone, availability_zone,
ignore_missing=ignore_missing)
self._delete(
_availability_zone.AvailabilityZone,
availability_zone,
ignore_missing=ignore_missing,
)
def find_availability_zone(self, name_or_id, ignore_missing=True):
"""Find a single availability zone
@@ -1192,8 +1257,11 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
return self._find(_availability_zone.AvailabilityZone, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_availability_zone.AvailabilityZone,
name_or_id,
ignore_missing=ignore_missing,
)
def update_availability_zone(self, availability_zone, **attrs):
"""Update an availability zone
@@ -1209,5 +1277,6 @@ class Proxy(proxy.Proxy):
:rtype:
:class:`~openstack.load_balancer.v2.availability_zone.AvailabilityZone`
"""
return self._update(_availability_zone.AvailabilityZone,
availability_zone, **attrs)
return self._update(
_availability_zone.AvailabilityZone, availability_zone, **attrs
)

View File

@@ -28,10 +28,26 @@ class Amphora(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'id', 'loadbalancer_id', 'compute_id', 'lb_network_ip', 'vrrp_ip',
'ha_ip', 'vrrp_port_id', 'ha_port_id', 'cert_expiration', 'cert_busy',
'role', 'status', 'vrrp_interface', 'vrrp_id', 'vrrp_priority',
'cached_zone', 'created_at', 'updated_at', 'image_id', 'image_id'
'id',
'loadbalancer_id',
'compute_id',
'lb_network_ip',
'vrrp_ip',
'ha_ip',
'vrrp_port_id',
'ha_port_id',
'cert_expiration',
'cert_busy',
'role',
'status',
'vrrp_interface',
'vrrp_id',
'vrrp_priority',
'cached_zone',
'created_at',
'updated_at',
'image_id',
'image_id',
)
# Properties
@@ -99,7 +115,8 @@ class AmphoraConfig(resource.Resource):
# way to pass has_body into this function, so overriding the method here.
def commit(self, session, base_path=None):
return super(AmphoraConfig, self).commit(
session, base_path=base_path, has_body=False)
session, base_path=base_path, has_body=False
)
class AmphoraFailover(resource.Resource):
@@ -123,4 +140,5 @@ class AmphoraFailover(resource.Resource):
# way to pass has_body into this function, so overriding the method here.
def commit(self, session, base_path=None):
return super(AmphoraFailover, self).commit(
session, base_path=base_path, has_body=False)
session, base_path=base_path, has_body=False
)

View File

@@ -27,8 +27,10 @@ class AvailabilityZone(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'name', 'description', 'availability_zone_profile_id',
is_enabled='enabled'
'name',
'description',
'availability_zone_profile_id',
is_enabled='enabled',
)
# Properties
@@ -38,6 +40,7 @@ class AvailabilityZone(resource.Resource):
description = resource.Body('description')
#: The associated availability zone profile ID
availability_zone_profile_id = resource.Body(
'availability_zone_profile_id')
'availability_zone_profile_id'
)
#: Whether the availability zone is enabled for use or not.
is_enabled = resource.Body('enabled')

View File

@@ -26,10 +26,22 @@ class HealthMonitor(resource.Resource, tag.TagMixin):
allow_commit = True
_query_mapping = resource.QueryParameters(
'name', 'created_at', 'updated_at', 'delay', 'expected_codes',
'http_method', 'max_retries', 'max_retries_down', 'pool_id',
'provisioning_status', 'operating_status', 'timeout',
'project_id', 'type', 'url_path', is_admin_state_up='admin_state_up',
'name',
'created_at',
'updated_at',
'delay',
'expected_codes',
'http_method',
'max_retries',
'max_retries_down',
'pool_id',
'provisioning_status',
'operating_status',
'timeout',
'project_id',
'type',
'url_path',
is_admin_state_up='admin_state_up',
**tag.TagMixin._tag_query_parameters
)

View File

@@ -26,9 +26,17 @@ class L7Policy(resource.Resource, tag.TagMixin):
allow_delete = True
_query_mapping = resource.QueryParameters(
'action', 'description', 'listener_id', 'name', 'position',
'redirect_pool_id', 'redirect_url', 'provisioning_status',
'operating_status', 'redirect_prefix', 'project_id',
'action',
'description',
'listener_id',
'name',
'position',
'redirect_pool_id',
'redirect_url',
'provisioning_status',
'operating_status',
'redirect_prefix',
'project_id',
is_admin_state_up='admin_state_up',
**tag.TagMixin._tag_query_parameters
)

View File

@@ -26,10 +26,19 @@ class L7Rule(resource.Resource, tag.TagMixin):
allow_delete = True
_query_mapping = resource.QueryParameters(
'compare_type', 'created_at', 'invert', 'key', 'project_id',
'provisioning_status', 'type', 'updated_at', 'rule_value',
'operating_status', is_admin_state_up='admin_state_up',
l7_policy_id='l7policy_id', **tag.TagMixin._tag_query_parameters
'compare_type',
'created_at',
'invert',
'key',
'project_id',
'provisioning_status',
'type',
'updated_at',
'rule_value',
'operating_status',
is_admin_state_up='admin_state_up',
l7_policy_id='l7policy_id',
**tag.TagMixin._tag_query_parameters
)
#: Properties

View File

@@ -26,13 +26,29 @@ class Listener(resource.Resource, tag.TagMixin):
allow_list = True
_query_mapping = resource.QueryParameters(
'connection_limit', 'default_pool_id', 'default_tls_container_ref',
'description', 'name', 'project_id', 'protocol', 'protocol_port',
'created_at', 'updated_at', 'provisioning_status', 'operating_status',
'sni_container_refs', 'insert_headers', 'load_balancer_id',
'timeout_client_data', 'timeout_member_connect',
'timeout_member_data', 'timeout_tcp_inspect', 'allowed_cidrs',
'tls_ciphers', 'tls_versions', 'alpn_protocols',
'connection_limit',
'default_pool_id',
'default_tls_container_ref',
'description',
'name',
'project_id',
'protocol',
'protocol_port',
'created_at',
'updated_at',
'provisioning_status',
'operating_status',
'sni_container_refs',
'insert_headers',
'load_balancer_id',
'timeout_client_data',
'timeout_member_connect',
'timeout_member_data',
'timeout_tcp_inspect',
'allowed_cidrs',
'tls_ciphers',
'tls_versions',
'alpn_protocols',
is_admin_state_up='admin_state_up',
**tag.TagMixin._tag_query_parameters
)

View File

@@ -26,10 +26,20 @@ class LoadBalancer(resource.Resource, tag.TagMixin):
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'flavor_id', 'name', 'project_id', 'provider',
'vip_address', 'vip_network_id', 'vip_port_id', 'vip_subnet_id',
'vip_qos_policy_id', 'provisioning_status', 'operating_status',
'availability_zone', is_admin_state_up='admin_state_up',
'description',
'flavor_id',
'name',
'project_id',
'provider',
'vip_address',
'vip_network_id',
'vip_port_id',
'vip_subnet_id',
'vip_qos_policy_id',
'provisioning_status',
'operating_status',
'availability_zone',
is_admin_state_up='admin_state_up',
**tag.TagMixin._tag_query_parameters
)
@@ -76,14 +86,17 @@ class LoadBalancer(resource.Resource, tag.TagMixin):
def delete(self, session, error_message=None):
request = self._prepare_request()
params = {}
if (hasattr(self, 'cascade') and isinstance(self.cascade, bool)
and self.cascade):
if (
hasattr(self, 'cascade')
and isinstance(self.cascade, bool)
and self.cascade
):
params['cascade'] = True
response = session.delete(request.url,
params=params)
response = session.delete(request.url, params=params)
self._translate_response(response, has_body=False,
error_message=error_message)
self._translate_response(
response, has_body=False, error_message=error_message
)
return self
@@ -134,4 +147,5 @@ class LoadBalancerFailover(resource.Resource):
# way to pass has_body into this function, so overriding the method here.
def commit(self, session, base_path=None):
return super(LoadBalancerFailover, self).commit(
session, base_path=base_path, has_body=False)
session, base_path=base_path, has_body=False
)

View File

@@ -26,9 +26,19 @@ class Member(resource.Resource, tag.TagMixin):
allow_list = True
_query_mapping = resource.QueryParameters(
'address', 'name', 'protocol_port', 'subnet_id', 'weight',
'created_at', 'updated_at', 'provisioning_status', 'operating_status',
'project_id', 'monitor_address', 'monitor_port', 'backup',
'address',
'name',
'protocol_port',
'subnet_id',
'weight',
'created_at',
'updated_at',
'provisioning_status',
'operating_status',
'project_id',
'monitor_address',
'monitor_port',
'backup',
is_admin_state_up='admin_state_up',
**tag.TagMixin._tag_query_parameters
)

View File

@@ -26,10 +26,22 @@ class Pool(resource.Resource, tag.TagMixin):
allow_commit = True
_query_mapping = resource.QueryParameters(
'health_monitor_id', 'lb_algorithm', 'listener_id', 'loadbalancer_id',
'description', 'name', 'project_id', 'protocol',
'created_at', 'updated_at', 'provisioning_status', 'operating_status',
'tls_enabled', 'tls_ciphers', 'tls_versions', 'alpn_protocols',
'health_monitor_id',
'lb_algorithm',
'listener_id',
'loadbalancer_id',
'description',
'name',
'project_id',
'protocol',
'created_at',
'updated_at',
'provisioning_status',
'operating_status',
'tls_enabled',
'tls_ciphers',
'tls_versions',
'alpn_protocols',
is_admin_state_up='admin_state_up',
**tag.TagMixin._tag_query_parameters
)

View File

@@ -41,11 +41,12 @@ class Quota(resource.Resource):
#: The ID of the project this quota is associated with.
project_id = resource.Body('project_id', alternate_id=True)
def _prepare_request(self, requires_id=True,
base_path=None, prepend_key=False, **kwargs):
_request = super(Quota, self)._prepare_request(requires_id,
prepend_key,
base_path=base_path)
def _prepare_request(
self, requires_id=True, base_path=None, prepend_key=False, **kwargs
):
_request = super(Quota, self)._prepare_request(
requires_id, prepend_key, base_path=base_path
)
if self.resource_key in _request.body:
_body = _request.body[self.resource_key]
else:

View File

@@ -84,56 +84,73 @@ class TestLoadBalancer(base.BaseFunctionalTest):
self.VIP_SUBNET_ID = subnets[0].id
self.PROJECT_ID = self.conn.session.get_project_id()
test_quota = self.conn.load_balancer.update_quota(
self.PROJECT_ID, **{'load_balancer': 100,
'pool': 100,
'listener': 100,
'health_monitor': 100,
'member': 100})
self.PROJECT_ID,
**{
'load_balancer': 100,
'pool': 100,
'listener': 100,
'health_monitor': 100,
'member': 100,
}
)
assert isinstance(test_quota, quota.Quota)
self.assertEqual(self.PROJECT_ID, test_quota.id)
test_flavor_profile = self.conn.load_balancer.create_flavor_profile(
name=self.FLAVOR_PROFILE_NAME, provider_name=self.AMPHORA,
flavor_data=self.FLAVOR_DATA)
name=self.FLAVOR_PROFILE_NAME,
provider_name=self.AMPHORA,
flavor_data=self.FLAVOR_DATA,
)
assert isinstance(test_flavor_profile, flavor_profile.FlavorProfile)
self.assertEqual(self.FLAVOR_PROFILE_NAME, test_flavor_profile.name)
self.FLAVOR_PROFILE_ID = test_flavor_profile.id
test_flavor = self.conn.load_balancer.create_flavor(
name=self.FLAVOR_NAME, flavor_profile_id=self.FLAVOR_PROFILE_ID,
is_enabled=True, description=self.DESCRIPTION)
name=self.FLAVOR_NAME,
flavor_profile_id=self.FLAVOR_PROFILE_ID,
is_enabled=True,
description=self.DESCRIPTION,
)
assert isinstance(test_flavor, flavor.Flavor)
self.assertEqual(self.FLAVOR_NAME, test_flavor.name)
self.FLAVOR_ID = test_flavor.id
test_az_profile = \
test_az_profile = (
self.conn.load_balancer.create_availability_zone_profile(
name=self.AVAILABILITY_ZONE_PROFILE_NAME,
provider_name=self.AMPHORA,
availability_zone_data=self.AVAILABILITY_ZONE_DATA)
assert isinstance(test_az_profile,
availability_zone_profile.AvailabilityZoneProfile)
self.assertEqual(self.AVAILABILITY_ZONE_PROFILE_NAME,
test_az_profile.name)
availability_zone_data=self.AVAILABILITY_ZONE_DATA,
)
)
assert isinstance(
test_az_profile, availability_zone_profile.AvailabilityZoneProfile
)
self.assertEqual(
self.AVAILABILITY_ZONE_PROFILE_NAME, test_az_profile.name
)
self.AVAILABILITY_ZONE_PROFILE_ID = test_az_profile.id
test_az = self.conn.load_balancer.create_availability_zone(
name=self.AVAILABILITY_ZONE_NAME,
availability_zone_profile_id=self.AVAILABILITY_ZONE_PROFILE_ID,
is_enabled=True, description=self.DESCRIPTION)
is_enabled=True,
description=self.DESCRIPTION,
)
assert isinstance(test_az, availability_zone.AvailabilityZone)
self.assertEqual(self.AVAILABILITY_ZONE_NAME, test_az.name)
test_lb = self.conn.load_balancer.create_load_balancer(
name=self.LB_NAME, vip_subnet_id=self.VIP_SUBNET_ID,
project_id=self.PROJECT_ID)
name=self.LB_NAME,
vip_subnet_id=self.VIP_SUBNET_ID,
project_id=self.PROJECT_ID,
)
assert isinstance(test_lb, load_balancer.LoadBalancer)
self.assertEqual(self.LB_NAME, test_lb.name)
# Wait for the LB to go ACTIVE. On non-virtualization enabled hosts
# it can take nova up to ten minutes to boot a VM.
self.conn.load_balancer.wait_for_load_balancer(
test_lb.id, interval=1,
wait=self._wait_for_timeout)
test_lb.id, interval=1, wait=self._wait_for_timeout
)
self.LB_ID = test_lb.id
amphorae = self.conn.load_balancer.amphorae(loadbalancer_id=self.LB_ID)
@@ -141,113 +158,156 @@ class TestLoadBalancer(base.BaseFunctionalTest):
self.AMPHORA_ID = amp.id
test_listener = self.conn.load_balancer.create_listener(
name=self.LISTENER_NAME, protocol=self.PROTOCOL,
protocol_port=self.PROTOCOL_PORT, loadbalancer_id=self.LB_ID)
name=self.LISTENER_NAME,
protocol=self.PROTOCOL,
protocol_port=self.PROTOCOL_PORT,
loadbalancer_id=self.LB_ID,
)
assert isinstance(test_listener, listener.Listener)
self.assertEqual(self.LISTENER_NAME, test_listener.name)
self.LISTENER_ID = test_listener.id
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_pool = self.conn.load_balancer.create_pool(
name=self.POOL_NAME, protocol=self.PROTOCOL,
lb_algorithm=self.LB_ALGORITHM, listener_id=self.LISTENER_ID)
name=self.POOL_NAME,
protocol=self.PROTOCOL,
lb_algorithm=self.LB_ALGORITHM,
listener_id=self.LISTENER_ID,
)
assert isinstance(test_pool, pool.Pool)
self.assertEqual(self.POOL_NAME, test_pool.name)
self.POOL_ID = test_pool.id
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_member = self.conn.load_balancer.create_member(
pool=self.POOL_ID, name=self.MEMBER_NAME,
pool=self.POOL_ID,
name=self.MEMBER_NAME,
address=self.MEMBER_ADDRESS,
protocol_port=self.PROTOCOL_PORT, weight=self.WEIGHT)
protocol_port=self.PROTOCOL_PORT,
weight=self.WEIGHT,
)
assert isinstance(test_member, member.Member)
self.assertEqual(self.MEMBER_NAME, test_member.name)
self.MEMBER_ID = test_member.id
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_hm = self.conn.load_balancer.create_health_monitor(
pool_id=self.POOL_ID, name=self.HM_NAME, delay=self.DELAY,
timeout=self.TIMEOUT, max_retries=self.MAX_RETRY,
type=self.HM_TYPE)
pool_id=self.POOL_ID,
name=self.HM_NAME,
delay=self.DELAY,
timeout=self.TIMEOUT,
max_retries=self.MAX_RETRY,
type=self.HM_TYPE,
)
assert isinstance(test_hm, health_monitor.HealthMonitor)
self.assertEqual(self.HM_NAME, test_hm.name)
self.HM_ID = test_hm.id
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_l7policy = self.conn.load_balancer.create_l7_policy(
listener_id=self.LISTENER_ID, name=self.L7POLICY_NAME,
action=self.ACTION, redirect_url=self.REDIRECT_URL)
listener_id=self.LISTENER_ID,
name=self.L7POLICY_NAME,
action=self.ACTION,
redirect_url=self.REDIRECT_URL,
)
assert isinstance(test_l7policy, l7_policy.L7Policy)
self.assertEqual(self.L7POLICY_NAME, test_l7policy.name)
self.L7POLICY_ID = test_l7policy.id
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_l7rule = self.conn.load_balancer.create_l7_rule(
l7_policy=self.L7POLICY_ID, compare_type=self.COMPARE_TYPE,
type=self.L7RULE_TYPE, value=self.L7RULE_VALUE)
l7_policy=self.L7POLICY_ID,
compare_type=self.COMPARE_TYPE,
type=self.L7RULE_TYPE,
value=self.L7RULE_VALUE,
)
assert isinstance(test_l7rule, l7_rule.L7Rule)
self.assertEqual(self.COMPARE_TYPE, test_l7rule.compare_type)
self.L7RULE_ID = test_l7rule.id
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
def tearDown(self):
self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
self.conn.load_balancer.delete_quota(self.PROJECT_ID,
ignore_missing=False)
self.conn.load_balancer.delete_quota(
self.PROJECT_ID, ignore_missing=False
)
self.conn.load_balancer.delete_l7_rule(
self.L7RULE_ID, l7_policy=self.L7POLICY_ID, ignore_missing=False)
self.L7RULE_ID, l7_policy=self.L7POLICY_ID, ignore_missing=False
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
self.conn.load_balancer.delete_l7_policy(
self.L7POLICY_ID, ignore_missing=False)
self.L7POLICY_ID, ignore_missing=False
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
self.conn.load_balancer.delete_health_monitor(
self.HM_ID, ignore_missing=False)
self.HM_ID, ignore_missing=False
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
self.conn.load_balancer.delete_member(
self.MEMBER_ID, self.POOL_ID, ignore_missing=False)
self.MEMBER_ID, self.POOL_ID, ignore_missing=False
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
self.conn.load_balancer.delete_pool(self.POOL_ID, ignore_missing=False)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
self.conn.load_balancer.delete_listener(self.LISTENER_ID,
ignore_missing=False)
self.conn.load_balancer.delete_listener(
self.LISTENER_ID, ignore_missing=False
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
self.conn.load_balancer.delete_load_balancer(
self.LB_ID, ignore_missing=False)
self.LB_ID, ignore_missing=False
)
super(TestLoadBalancer, self).tearDown()
self.conn.load_balancer.delete_flavor(self.FLAVOR_ID,
ignore_missing=False)
self.conn.load_balancer.delete_flavor(
self.FLAVOR_ID, ignore_missing=False
)
self.conn.load_balancer.delete_flavor_profile(self.FLAVOR_PROFILE_ID,
ignore_missing=False)
self.conn.load_balancer.delete_flavor_profile(
self.FLAVOR_PROFILE_ID, ignore_missing=False
)
self.conn.load_balancer.delete_availability_zone(
self.AVAILABILITY_ZONE_NAME, ignore_missing=False)
self.AVAILABILITY_ZONE_NAME, ignore_missing=False
)
self.conn.load_balancer.delete_availability_zone_profile(
self.AVAILABILITY_ZONE_PROFILE_ID, ignore_missing=False)
self.AVAILABILITY_ZONE_PROFILE_ID, ignore_missing=False
)
def test_lb_find(self):
test_lb = self.conn.load_balancer.find_load_balancer(self.LB_NAME)
@@ -261,7 +321,8 @@ class TestLoadBalancer(base.BaseFunctionalTest):
def test_lb_get_stats(self):
test_lb_stats = self.conn.load_balancer.get_load_balancer_statistics(
self.LB_ID)
self.LB_ID
)
self.assertEqual(0, test_lb_stats.active_connections)
self.assertEqual(0, test_lb_stats.bytes_in)
self.assertEqual(0, test_lb_stats.bytes_out)
@@ -274,29 +335,35 @@ class TestLoadBalancer(base.BaseFunctionalTest):
def test_lb_update(self):
self.conn.load_balancer.update_load_balancer(
self.LB_ID, name=self.UPDATE_NAME)
self.LB_ID, name=self.UPDATE_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_lb = self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.assertEqual(self.UPDATE_NAME, test_lb.name)
self.conn.load_balancer.update_load_balancer(
self.LB_ID, name=self.LB_NAME)
self.LB_ID, name=self.LB_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_lb = self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.assertEqual(self.LB_NAME, test_lb.name)
def test_lb_failover(self):
self.conn.load_balancer.failover_load_balancer(self.LB_ID)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_lb = self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.assertEqual(self.LB_NAME, test_lb.name)
def test_listener_find(self):
test_listener = self.conn.load_balancer.find_listener(
self.LISTENER_NAME)
self.LISTENER_NAME
)
self.assertEqual(self.LISTENER_ID, test_listener.id)
def test_listener_get(self):
@@ -308,7 +375,8 @@ class TestLoadBalancer(base.BaseFunctionalTest):
def test_listener_get_stats(self):
test_listener_stats = self.conn.load_balancer.get_listener_statistics(
self.LISTENER_ID)
self.LISTENER_ID
)
self.assertEqual(0, test_listener_stats.active_connections)
self.assertEqual(0, test_listener_stats.bytes_in)
self.assertEqual(0, test_listener_stats.bytes_out)
@@ -323,16 +391,20 @@ class TestLoadBalancer(base.BaseFunctionalTest):
self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.conn.load_balancer.update_listener(
self.LISTENER_ID, name=self.UPDATE_NAME)
self.LISTENER_ID, name=self.UPDATE_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_listener = self.conn.load_balancer.get_listener(self.LISTENER_ID)
self.assertEqual(self.UPDATE_NAME, test_listener.name)
self.conn.load_balancer.update_listener(
self.LISTENER_ID, name=self.LISTENER_NAME)
self.LISTENER_ID, name=self.LISTENER_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_listener = self.conn.load_balancer.get_listener(self.LISTENER_ID)
self.assertEqual(self.LISTENER_NAME, test_listener.name)
@@ -353,28 +425,32 @@ class TestLoadBalancer(base.BaseFunctionalTest):
def test_pool_update(self):
self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.conn.load_balancer.update_pool(self.POOL_ID,
name=self.UPDATE_NAME)
self.conn.load_balancer.update_pool(
self.POOL_ID, name=self.UPDATE_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_pool = self.conn.load_balancer.get_pool(self.POOL_ID)
self.assertEqual(self.UPDATE_NAME, test_pool.name)
self.conn.load_balancer.update_pool(self.POOL_ID,
name=self.POOL_NAME)
self.conn.load_balancer.update_pool(self.POOL_ID, name=self.POOL_NAME)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_pool = self.conn.load_balancer.get_pool(self.POOL_ID)
self.assertEqual(self.POOL_NAME, test_pool.name)
def test_member_find(self):
test_member = self.conn.load_balancer.find_member(self.MEMBER_NAME,
self.POOL_ID)
test_member = self.conn.load_balancer.find_member(
self.MEMBER_NAME, self.POOL_ID
)
self.assertEqual(self.MEMBER_ID, test_member.id)
def test_member_get(self):
test_member = self.conn.load_balancer.get_member(self.MEMBER_ID,
self.POOL_ID)
test_member = self.conn.load_balancer.get_member(
self.MEMBER_ID, self.POOL_ID
)
self.assertEqual(self.MEMBER_NAME, test_member.name)
self.assertEqual(self.MEMBER_ID, test_member.id)
self.assertEqual(self.MEMBER_ADDRESS, test_member.address)
@@ -382,27 +458,34 @@ class TestLoadBalancer(base.BaseFunctionalTest):
self.assertEqual(self.WEIGHT, test_member.weight)
def test_member_list(self):
names = [mb.name for mb in self.conn.load_balancer.members(
self.POOL_ID)]
names = [
mb.name for mb in self.conn.load_balancer.members(self.POOL_ID)
]
self.assertIn(self.MEMBER_NAME, names)
def test_member_update(self):
self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.conn.load_balancer.update_member(self.MEMBER_ID, self.POOL_ID,
name=self.UPDATE_NAME)
self.conn.load_balancer.update_member(
self.MEMBER_ID, self.POOL_ID, name=self.UPDATE_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
test_member = self.conn.load_balancer.get_member(self.MEMBER_ID,
self.POOL_ID)
self.LB_ID, wait=self._wait_for_timeout
)
test_member = self.conn.load_balancer.get_member(
self.MEMBER_ID, self.POOL_ID
)
self.assertEqual(self.UPDATE_NAME, test_member.name)
self.conn.load_balancer.update_member(self.MEMBER_ID, self.POOL_ID,
name=self.MEMBER_NAME)
self.conn.load_balancer.update_member(
self.MEMBER_ID, self.POOL_ID, name=self.MEMBER_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
test_member = self.conn.load_balancer.get_member(self.MEMBER_ID,
self.POOL_ID)
self.LB_ID, wait=self._wait_for_timeout
)
test_member = self.conn.load_balancer.get_member(
self.MEMBER_ID, self.POOL_ID
)
self.assertEqual(self.MEMBER_NAME, test_member.name)
def test_health_monitor_find(self):
@@ -425,28 +508,34 @@ class TestLoadBalancer(base.BaseFunctionalTest):
def test_health_monitor_update(self):
self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.conn.load_balancer.update_health_monitor(self.HM_ID,
name=self.UPDATE_NAME)
self.conn.load_balancer.update_health_monitor(
self.HM_ID, name=self.UPDATE_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_hm = self.conn.load_balancer.get_health_monitor(self.HM_ID)
self.assertEqual(self.UPDATE_NAME, test_hm.name)
self.conn.load_balancer.update_health_monitor(self.HM_ID,
name=self.HM_NAME)
self.conn.load_balancer.update_health_monitor(
self.HM_ID, name=self.HM_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_hm = self.conn.load_balancer.get_health_monitor(self.HM_ID)
self.assertEqual(self.HM_NAME, test_hm.name)
def test_l7_policy_find(self):
test_l7_policy = self.conn.load_balancer.find_l7_policy(
self.L7POLICY_NAME)
self.L7POLICY_NAME
)
self.assertEqual(self.L7POLICY_ID, test_l7_policy.id)
def test_l7_policy_get(self):
test_l7_policy = self.conn.load_balancer.get_l7_policy(
self.L7POLICY_ID)
self.L7POLICY_ID
)
self.assertEqual(self.L7POLICY_NAME, test_l7_policy.name)
self.assertEqual(self.L7POLICY_ID, test_l7_policy.id)
self.assertEqual(self.ACTION, test_l7_policy.action)
@@ -459,59 +548,80 @@ class TestLoadBalancer(base.BaseFunctionalTest):
self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.conn.load_balancer.update_l7_policy(
self.L7POLICY_ID, name=self.UPDATE_NAME)
self.L7POLICY_ID, name=self.UPDATE_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_l7_policy = self.conn.load_balancer.get_l7_policy(
self.L7POLICY_ID)
self.L7POLICY_ID
)
self.assertEqual(self.UPDATE_NAME, test_l7_policy.name)
self.conn.load_balancer.update_l7_policy(self.L7POLICY_ID,
name=self.L7POLICY_NAME)
self.conn.load_balancer.update_l7_policy(
self.L7POLICY_ID, name=self.L7POLICY_NAME
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_l7_policy = self.conn.load_balancer.get_l7_policy(
self.L7POLICY_ID)
self.L7POLICY_ID
)
self.assertEqual(self.L7POLICY_NAME, test_l7_policy.name)
def test_l7_rule_find(self):
test_l7_rule = self.conn.load_balancer.find_l7_rule(
self.L7RULE_ID, self.L7POLICY_ID)
self.L7RULE_ID, self.L7POLICY_ID
)
self.assertEqual(self.L7RULE_ID, test_l7_rule.id)
self.assertEqual(self.L7RULE_TYPE, test_l7_rule.type)
def test_l7_rule_get(self):
test_l7_rule = self.conn.load_balancer.get_l7_rule(
self.L7RULE_ID, l7_policy=self.L7POLICY_ID)
self.L7RULE_ID, l7_policy=self.L7POLICY_ID
)
self.assertEqual(self.L7RULE_ID, test_l7_rule.id)
self.assertEqual(self.COMPARE_TYPE, test_l7_rule.compare_type)
self.assertEqual(self.L7RULE_TYPE, test_l7_rule.type)
self.assertEqual(self.L7RULE_VALUE, test_l7_rule.rule_value)
def test_l7_rule_list(self):
ids = [l7.id for l7 in self.conn.load_balancer.l7_rules(
l7_policy=self.L7POLICY_ID)]
ids = [
l7.id
for l7 in self.conn.load_balancer.l7_rules(
l7_policy=self.L7POLICY_ID
)
]
self.assertIn(self.L7RULE_ID, ids)
def test_l7_rule_update(self):
self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.conn.load_balancer.update_l7_rule(self.L7RULE_ID,
l7_policy=self.L7POLICY_ID,
rule_value=self.UPDATE_NAME)
self.conn.load_balancer.update_l7_rule(
self.L7RULE_ID,
l7_policy=self.L7POLICY_ID,
rule_value=self.UPDATE_NAME,
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_l7_rule = self.conn.load_balancer.get_l7_rule(
self.L7RULE_ID, l7_policy=self.L7POLICY_ID)
self.L7RULE_ID, l7_policy=self.L7POLICY_ID
)
self.assertEqual(self.UPDATE_NAME, test_l7_rule.rule_value)
self.conn.load_balancer.update_l7_rule(self.L7RULE_ID,
l7_policy=self.L7POLICY_ID,
rule_value=self.L7RULE_VALUE)
self.conn.load_balancer.update_l7_rule(
self.L7RULE_ID,
l7_policy=self.L7POLICY_ID,
rule_value=self.L7RULE_VALUE,
)
self.conn.load_balancer.wait_for_load_balancer(
self.LB_ID, wait=self._wait_for_timeout)
self.LB_ID, wait=self._wait_for_timeout
)
test_l7_rule = self.conn.load_balancer.get_l7_rule(
self.L7RULE_ID, l7_policy=self.L7POLICY_ID,)
self.L7RULE_ID,
l7_policy=self.L7POLICY_ID,
)
self.assertEqual(self.L7RULE_VALUE, test_l7_rule.rule_value)
def test_quota_list(self):
@@ -527,7 +637,8 @@ class TestLoadBalancer(base.BaseFunctionalTest):
for project_quota in self.conn.load_balancer.quotas():
self.conn.load_balancer.update_quota(project_quota, **attrs)
new_quota = self.conn.load_balancer.get_quota(
project_quota.project_id)
project_quota.project_id
)
self.assertEqual(12345, new_quota.load_balancers)
self.assertEqual(67890, new_quota.pools)
@@ -538,23 +649,28 @@ class TestLoadBalancer(base.BaseFunctionalTest):
providers = self.conn.load_balancer.providers()
# Make sure our default provider is in the list
self.assertTrue(
any(prov['name'] == self.AMPHORA for prov in providers))
any(prov['name'] == self.AMPHORA for prov in providers)
)
def test_provider_flavor_capabilities(self):
capabilities = self.conn.load_balancer.provider_flavor_capabilities(
self.AMPHORA)
self.AMPHORA
)
# Make sure a known capability is in the default provider
self.assertTrue(any(
cap['name'] == 'loadbalancer_topology' for cap in capabilities))
self.assertTrue(
any(cap['name'] == 'loadbalancer_topology' for cap in capabilities)
)
def test_flavor_profile_find(self):
test_profile = self.conn.load_balancer.find_flavor_profile(
self.FLAVOR_PROFILE_NAME)
self.FLAVOR_PROFILE_NAME
)
self.assertEqual(self.FLAVOR_PROFILE_ID, test_profile.id)
def test_flavor_profile_get(self):
test_flavor_profile = self.conn.load_balancer.get_flavor_profile(
self.FLAVOR_PROFILE_ID)
self.FLAVOR_PROFILE_ID
)
self.assertEqual(self.FLAVOR_PROFILE_NAME, test_flavor_profile.name)
self.assertEqual(self.FLAVOR_PROFILE_ID, test_flavor_profile.id)
self.assertEqual(self.AMPHORA, test_flavor_profile.provider_name)
@@ -566,15 +682,19 @@ class TestLoadBalancer(base.BaseFunctionalTest):
def test_flavor_profile_update(self):
self.conn.load_balancer.update_flavor_profile(
self.FLAVOR_PROFILE_ID, name=self.UPDATE_NAME)
self.FLAVOR_PROFILE_ID, name=self.UPDATE_NAME
)
test_flavor_profile = self.conn.load_balancer.get_flavor_profile(
self.FLAVOR_PROFILE_ID)
self.FLAVOR_PROFILE_ID
)
self.assertEqual(self.UPDATE_NAME, test_flavor_profile.name)
self.conn.load_balancer.update_flavor_profile(
self.FLAVOR_PROFILE_ID, name=self.FLAVOR_PROFILE_NAME)
self.FLAVOR_PROFILE_ID, name=self.FLAVOR_PROFILE_NAME
)
test_flavor_profile = self.conn.load_balancer.get_flavor_profile(
self.FLAVOR_PROFILE_ID)
self.FLAVOR_PROFILE_ID
)
self.assertEqual(self.FLAVOR_PROFILE_NAME, test_flavor_profile.name)
def test_flavor_find(self):
@@ -594,12 +714,14 @@ class TestLoadBalancer(base.BaseFunctionalTest):
def test_flavor_update(self):
self.conn.load_balancer.update_flavor(
self.FLAVOR_ID, name=self.UPDATE_NAME)
self.FLAVOR_ID, name=self.UPDATE_NAME
)
test_flavor = self.conn.load_balancer.get_flavor(self.FLAVOR_ID)
self.assertEqual(self.UPDATE_NAME, test_flavor.name)
self.conn.load_balancer.update_flavor(
self.FLAVOR_ID, name=self.FLAVOR_NAME)
self.FLAVOR_ID, name=self.FLAVOR_NAME
)
test_flavor = self.conn.load_balancer.get_flavor(self.FLAVOR_ID)
self.assertEqual(self.FLAVOR_NAME, test_flavor.name)
@@ -627,75 +749,108 @@ class TestLoadBalancer(base.BaseFunctionalTest):
def test_availability_zone_profile_find(self):
test_profile = self.conn.load_balancer.find_availability_zone_profile(
self.AVAILABILITY_ZONE_PROFILE_NAME)
self.AVAILABILITY_ZONE_PROFILE_NAME
)
self.assertEqual(self.AVAILABILITY_ZONE_PROFILE_ID, test_profile.id)
def test_availability_zone_profile_get(self):
test_availability_zone_profile = \
test_availability_zone_profile = (
self.conn.load_balancer.get_availability_zone_profile(
self.AVAILABILITY_ZONE_PROFILE_ID)
self.assertEqual(self.AVAILABILITY_ZONE_PROFILE_NAME,
test_availability_zone_profile.name)
self.assertEqual(self.AVAILABILITY_ZONE_PROFILE_ID,
test_availability_zone_profile.id)
self.assertEqual(self.AMPHORA,
test_availability_zone_profile.provider_name)
self.assertEqual(self.AVAILABILITY_ZONE_DATA,
test_availability_zone_profile.availability_zone_data)
self.AVAILABILITY_ZONE_PROFILE_ID
)
)
self.assertEqual(
self.AVAILABILITY_ZONE_PROFILE_NAME,
test_availability_zone_profile.name,
)
self.assertEqual(
self.AVAILABILITY_ZONE_PROFILE_ID,
test_availability_zone_profile.id,
)
self.assertEqual(
self.AMPHORA, test_availability_zone_profile.provider_name
)
self.assertEqual(
self.AVAILABILITY_ZONE_DATA,
test_availability_zone_profile.availability_zone_data,
)
def test_availability_zone_profile_list(self):
names = [az.name for az in
self.conn.load_balancer.availability_zone_profiles()]
names = [
az.name
for az in self.conn.load_balancer.availability_zone_profiles()
]
self.assertIn(self.AVAILABILITY_ZONE_PROFILE_NAME, names)
def test_availability_zone_profile_update(self):
self.conn.load_balancer.update_availability_zone_profile(
self.AVAILABILITY_ZONE_PROFILE_ID, name=self.UPDATE_NAME)
test_availability_zone_profile = \
self.AVAILABILITY_ZONE_PROFILE_ID, name=self.UPDATE_NAME
)
test_availability_zone_profile = (
self.conn.load_balancer.get_availability_zone_profile(
self.AVAILABILITY_ZONE_PROFILE_ID)
self.AVAILABILITY_ZONE_PROFILE_ID
)
)
self.assertEqual(self.UPDATE_NAME, test_availability_zone_profile.name)
self.conn.load_balancer.update_availability_zone_profile(
self.AVAILABILITY_ZONE_PROFILE_ID,
name=self.AVAILABILITY_ZONE_PROFILE_NAME)
test_availability_zone_profile = \
name=self.AVAILABILITY_ZONE_PROFILE_NAME,
)
test_availability_zone_profile = (
self.conn.load_balancer.get_availability_zone_profile(
self.AVAILABILITY_ZONE_PROFILE_ID)
self.assertEqual(self.AVAILABILITY_ZONE_PROFILE_NAME,
test_availability_zone_profile.name)
self.AVAILABILITY_ZONE_PROFILE_ID
)
)
self.assertEqual(
self.AVAILABILITY_ZONE_PROFILE_NAME,
test_availability_zone_profile.name,
)
def test_availability_zone_find(self):
test_availability_zone = \
test_availability_zone = (
self.conn.load_balancer.find_availability_zone(
self.AVAILABILITY_ZONE_NAME)
self.assertEqual(self.AVAILABILITY_ZONE_NAME,
test_availability_zone.name)
self.AVAILABILITY_ZONE_NAME
)
)
self.assertEqual(
self.AVAILABILITY_ZONE_NAME, test_availability_zone.name
)
def test_availability_zone_get(self):
test_availability_zone = self.conn.load_balancer.get_availability_zone(
self.AVAILABILITY_ZONE_NAME)
self.assertEqual(self.AVAILABILITY_ZONE_NAME,
test_availability_zone.name)
self.AVAILABILITY_ZONE_NAME
)
self.assertEqual(
self.AVAILABILITY_ZONE_NAME, test_availability_zone.name
)
self.assertEqual(self.DESCRIPTION, test_availability_zone.description)
self.assertEqual(self.AVAILABILITY_ZONE_PROFILE_ID,
test_availability_zone.availability_zone_profile_id)
self.assertEqual(
self.AVAILABILITY_ZONE_PROFILE_ID,
test_availability_zone.availability_zone_profile_id,
)
def test_availability_zone_list(self):
names = [az.name for az in
self.conn.load_balancer.availability_zones()]
names = [
az.name for az in self.conn.load_balancer.availability_zones()
]
self.assertIn(self.AVAILABILITY_ZONE_NAME, names)
def test_availability_zone_update(self):
self.conn.load_balancer.update_availability_zone(
self.AVAILABILITY_ZONE_NAME, description=self.UPDATE_DESCRIPTION)
self.AVAILABILITY_ZONE_NAME, description=self.UPDATE_DESCRIPTION
)
test_availability_zone = self.conn.load_balancer.get_availability_zone(
self.AVAILABILITY_ZONE_NAME)
self.assertEqual(self.UPDATE_DESCRIPTION,
test_availability_zone.description)
self.AVAILABILITY_ZONE_NAME
)
self.assertEqual(
self.UPDATE_DESCRIPTION, test_availability_zone.description
)
self.conn.load_balancer.update_availability_zone(
self.AVAILABILITY_ZONE_NAME, description=self.DESCRIPTION)
self.AVAILABILITY_ZONE_NAME, description=self.DESCRIPTION
)
test_availability_zone = self.conn.load_balancer.get_availability_zone(
self.AVAILABILITY_ZONE_NAME)
self.AVAILABILITY_ZONE_NAME
)
self.assertEqual(self.DESCRIPTION, test_availability_zone.description)

View File

@@ -48,12 +48,11 @@ EXAMPLE = {
'created_at': '2017-05-10T18:14:44',
'updated_at': '2017-05-10T23:08:12',
'image_id': IMAGE_ID,
'compute_flavor': COMPUTE_FLAVOR
'compute_flavor': COMPUTE_FLAVOR,
}
class TestAmphora(base.TestCase):
def test_basic(self):
test_amphora = amphora.Amphora()
self.assertEqual('amphora', test_amphora.resource_key)
@@ -75,13 +74,15 @@ class TestAmphora(base.TestCase):
self.assertEqual(EXAMPLE['ha_ip'], test_amphora.ha_ip)
self.assertEqual(VRRP_PORT_ID, test_amphora.vrrp_port_id)
self.assertEqual(HA_PORT_ID, test_amphora.ha_port_id)
self.assertEqual(EXAMPLE['cert_expiration'],
test_amphora.cert_expiration)
self.assertEqual(
EXAMPLE['cert_expiration'], test_amphora.cert_expiration
)
self.assertEqual(EXAMPLE['cert_busy'], test_amphora.cert_busy)
self.assertEqual(EXAMPLE['role'], test_amphora.role)
self.assertEqual(EXAMPLE['status'], test_amphora.status)
self.assertEqual(EXAMPLE['vrrp_interface'],
test_amphora.vrrp_interface)
self.assertEqual(
EXAMPLE['vrrp_interface'], test_amphora.vrrp_interface
)
self.assertEqual(EXAMPLE['vrrp_id'], test_amphora.vrrp_id)
self.assertEqual(EXAMPLE['vrrp_priority'], test_amphora.vrrp_priority)
self.assertEqual(EXAMPLE['cached_zone'], test_amphora.cached_zone)
@@ -91,38 +92,41 @@ class TestAmphora(base.TestCase):
self.assertEqual(COMPUTE_FLAVOR, test_amphora.compute_flavor)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'id': 'id',
'loadbalancer_id': 'loadbalancer_id',
'compute_id': 'compute_id',
'lb_network_ip': 'lb_network_ip',
'vrrp_ip': 'vrrp_ip',
'ha_ip': 'ha_ip',
'vrrp_port_id': 'vrrp_port_id',
'ha_port_id': 'ha_port_id',
'cert_expiration': 'cert_expiration',
'cert_busy': 'cert_busy',
'role': 'role',
'status': 'status',
'vrrp_interface': 'vrrp_interface',
'vrrp_id': 'vrrp_id',
'vrrp_priority': 'vrrp_priority',
'cached_zone': 'cached_zone',
'created_at': 'created_at',
'updated_at': 'updated_at',
'image_id': 'image_id',
'image_id': 'image_id'
},
test_amphora._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'id': 'id',
'loadbalancer_id': 'loadbalancer_id',
'compute_id': 'compute_id',
'lb_network_ip': 'lb_network_ip',
'vrrp_ip': 'vrrp_ip',
'ha_ip': 'ha_ip',
'vrrp_port_id': 'vrrp_port_id',
'ha_port_id': 'ha_port_id',
'cert_expiration': 'cert_expiration',
'cert_busy': 'cert_busy',
'role': 'role',
'status': 'status',
'vrrp_interface': 'vrrp_interface',
'vrrp_id': 'vrrp_id',
'vrrp_priority': 'vrrp_priority',
'cached_zone': 'cached_zone',
'created_at': 'created_at',
'updated_at': 'updated_at',
'image_id': 'image_id',
'image_id': 'image_id',
},
test_amphora._query_mapping._mapping,
)
class TestAmphoraConfig(base.TestCase):
def test_basic(self):
test_amp_config = amphora.AmphoraConfig()
self.assertEqual('/octavia/amphorae/%(amphora_id)s/config',
test_amp_config.base_path)
self.assertEqual(
'/octavia/amphorae/%(amphora_id)s/config',
test_amp_config.base_path,
)
self.assertFalse(test_amp_config.allow_create)
self.assertFalse(test_amp_config.allow_fetch)
self.assertTrue(test_amp_config.allow_commit)
@@ -131,11 +135,12 @@ class TestAmphoraConfig(base.TestCase):
class TestAmphoraFailover(base.TestCase):
def test_basic(self):
test_amp_failover = amphora.AmphoraFailover()
self.assertEqual('/octavia/amphorae/%(amphora_id)s/failover',
test_amp_failover.base_path)
self.assertEqual(
'/octavia/amphorae/%(amphora_id)s/failover',
test_amp_failover.base_path,
)
self.assertFalse(test_amp_failover.allow_create)
self.assertFalse(test_amp_failover.allow_fetch)
self.assertTrue(test_amp_failover.allow_commit)

View File

@@ -22,19 +22,22 @@ EXAMPLE = {
'name': 'strawberry',
'description': 'tasty',
'is_enabled': False,
'availability_zone_profile_id': AVAILABILITY_ZONE_PROFILE_ID}
'availability_zone_profile_id': AVAILABILITY_ZONE_PROFILE_ID,
}
class TestAvailabilityZone(base.TestCase):
def test_basic(self):
test_availability_zone = availability_zone.AvailabilityZone()
self.assertEqual('availability_zone',
test_availability_zone.resource_key)
self.assertEqual('availability_zones',
test_availability_zone.resources_key)
self.assertEqual('/lbaas/availabilityzones',
test_availability_zone.base_path)
self.assertEqual(
'availability_zone', test_availability_zone.resource_key
)
self.assertEqual(
'availability_zones', test_availability_zone.resources_key
)
self.assertEqual(
'/lbaas/availabilityzones', test_availability_zone.base_path
)
self.assertTrue(test_availability_zone.allow_create)
self.assertTrue(test_availability_zone.allow_fetch)
self.assertTrue(test_availability_zone.allow_commit)
@@ -44,17 +47,23 @@ class TestAvailabilityZone(base.TestCase):
def test_make_it(self):
test_availability_zone = availability_zone.AvailabilityZone(**EXAMPLE)
self.assertEqual(EXAMPLE['name'], test_availability_zone.name)
self.assertEqual(EXAMPLE['description'],
test_availability_zone.description)
self.assertEqual(
EXAMPLE['description'], test_availability_zone.description
)
self.assertFalse(test_availability_zone.is_enabled)
self.assertEqual(EXAMPLE['availability_zone_profile_id'],
test_availability_zone.availability_zone_profile_id)
self.assertEqual(
EXAMPLE['availability_zone_profile_id'],
test_availability_zone.availability_zone_profile_id,
)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'name': 'name',
'description': 'description',
'is_enabled': 'enabled',
'availability_zone_profile_id': 'availability_zone_profile_id'},
test_availability_zone._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'name': 'name',
'description': 'description',
'is_enabled': 'enabled',
'availability_zone_profile_id': 'availability_zone_profile_id',
},
test_availability_zone._query_mapping._mapping,
)

View File

@@ -22,19 +22,22 @@ EXAMPLE = {
'id': IDENTIFIER,
'name': 'acidic',
'provider_name': 'best',
'availability_zone_data': '{"loadbalancer_topology": "SINGLE"}'}
'availability_zone_data': '{"loadbalancer_topology": "SINGLE"}',
}
class TestAvailabilityZoneProfile(base.TestCase):
def test_basic(self):
test_profile = availability_zone_profile.AvailabilityZoneProfile()
self.assertEqual('availability_zone_profile',
test_profile.resource_key)
self.assertEqual('availability_zone_profiles',
test_profile.resources_key)
self.assertEqual('/lbaas/availabilityzoneprofiles',
test_profile.base_path)
self.assertEqual(
'availability_zone_profile', test_profile.resource_key
)
self.assertEqual(
'availability_zone_profiles', test_profile.resources_key
)
self.assertEqual(
'/lbaas/availabilityzoneprofiles', test_profile.base_path
)
self.assertTrue(test_profile.allow_create)
self.assertTrue(test_profile.allow_fetch)
self.assertTrue(test_profile.allow_commit)
@@ -43,18 +46,24 @@ class TestAvailabilityZoneProfile(base.TestCase):
def test_make_it(self):
test_profile = availability_zone_profile.AvailabilityZoneProfile(
**EXAMPLE)
**EXAMPLE
)
self.assertEqual(EXAMPLE['id'], test_profile.id)
self.assertEqual(EXAMPLE['name'], test_profile.name)
self.assertEqual(EXAMPLE['provider_name'], test_profile.provider_name)
self.assertEqual(EXAMPLE['availability_zone_data'],
test_profile.availability_zone_data)
self.assertEqual(
EXAMPLE['availability_zone_data'],
test_profile.availability_zone_data,
)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'id': 'id',
'name': 'name',
'provider_name': 'provider_name',
'availability_zone_data': 'availability_zone_data'},
test_profile._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'id': 'id',
'name': 'name',
'provider_name': 'provider_name',
'availability_zone_data': 'availability_zone_data',
},
test_profile._query_mapping._mapping,
)

View File

@@ -25,11 +25,11 @@ EXAMPLE = {
'name': 'strawberry',
'description': 'tasty',
'is_enabled': False,
'flavor_profile_id': FLAVOR_PROFILE_ID}
'flavor_profile_id': FLAVOR_PROFILE_ID,
}
class TestFlavor(base.TestCase):
def test_basic(self):
test_flavor = flavor.Flavor()
self.assertEqual('flavor', test_flavor.resource_key)
@@ -47,15 +47,19 @@ class TestFlavor(base.TestCase):
self.assertEqual(EXAMPLE['name'], test_flavor.name)
self.assertEqual(EXAMPLE['description'], test_flavor.description)
self.assertFalse(test_flavor.is_enabled)
self.assertEqual(EXAMPLE['flavor_profile_id'],
test_flavor.flavor_profile_id)
self.assertEqual(
EXAMPLE['flavor_profile_id'], test_flavor.flavor_profile_id
)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'id': 'id',
'name': 'name',
'description': 'description',
'is_enabled': 'enabled',
'flavor_profile_id': 'flavor_profile_id'},
test_flavor._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'id': 'id',
'name': 'name',
'description': 'description',
'is_enabled': 'enabled',
'flavor_profile_id': 'flavor_profile_id',
},
test_flavor._query_mapping._mapping,
)

View File

@@ -23,11 +23,11 @@ EXAMPLE = {
'id': IDENTIFIER,
'name': 'acidic',
'provider_name': 'best',
'flavor_data': '{"loadbalancer_topology": "SINGLE"}'}
'flavor_data': '{"loadbalancer_topology": "SINGLE"}',
}
class TestFlavorProfile(base.TestCase):
def test_basic(self):
test_profile = flavor_profile.FlavorProfile()
self.assertEqual('flavorprofile', test_profile.resource_key)
@@ -47,10 +47,13 @@ class TestFlavorProfile(base.TestCase):
self.assertEqual(EXAMPLE['flavor_data'], test_profile.flavor_data)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'id': 'id',
'name': 'name',
'provider_name': 'provider_name',
'flavor_data': 'flavor_data'},
test_profile._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'id': 'id',
'name': 'name',
'provider_name': 'provider_name',
'flavor_data': 'flavor_data',
},
test_profile._query_mapping._mapping,
)

View File

@@ -35,12 +35,11 @@ EXAMPLE = {
'timeout': 4,
'type': 'HTTP',
'updated_at': '2017-07-17T12:16:57.233772',
'url_path': '/health_page.html'
'url_path': '/health_page.html',
}
class TestPoolHealthMonitor(base.TestCase):
def test_basic(self):
test_hm = health_monitor.HealthMonitor()
self.assertEqual('healthmonitor', test_hm.resource_key)
@@ -67,36 +66,38 @@ class TestPoolHealthMonitor(base.TestCase):
self.assertEqual(EXAMPLE['pools'], test_hm.pools)
self.assertEqual(EXAMPLE['pool_id'], test_hm.pool_id)
self.assertEqual(EXAMPLE['project_id'], test_hm.project_id)
self.assertEqual(EXAMPLE['provisioning_status'],
test_hm.provisioning_status)
self.assertEqual(
EXAMPLE['provisioning_status'], test_hm.provisioning_status
)
self.assertEqual(EXAMPLE['timeout'], test_hm.timeout)
self.assertEqual(EXAMPLE['type'], test_hm.type)
self.assertEqual(EXAMPLE['updated_at'], test_hm.updated_at)
self.assertEqual(EXAMPLE['url_path'], test_hm.url_path)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'name': 'name',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'delay': 'delay',
'expected_codes': 'expected_codes',
'http_method': 'http_method',
'max_retries': 'max_retries',
'max_retries_down': 'max_retries_down',
'pool_id': 'pool_id',
'timeout': 'timeout',
'type': 'type',
'url_path': 'url_path'
},
test_hm._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'name': 'name',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'delay': 'delay',
'expected_codes': 'expected_codes',
'http_method': 'http_method',
'max_retries': 'max_retries',
'max_retries_down': 'max_retries_down',
'pool_id': 'pool_id',
'timeout': 'timeout',
'type': 'type',
'url_path': 'url_path',
},
test_hm._query_mapping._mapping,
)

View File

@@ -37,7 +37,6 @@ EXAMPLE = {
class TestL7Policy(base.TestCase):
def test_basic(self):
test_l7_policy = l7_policy.L7Policy()
self.assertEqual('l7policy', test_l7_policy.resource_key)
@@ -58,39 +57,44 @@ class TestL7Policy(base.TestCase):
self.assertEqual(EXAMPLE['id'], test_l7_policy.id)
self.assertEqual(EXAMPLE['listener_id'], test_l7_policy.listener_id)
self.assertEqual(EXAMPLE['name'], test_l7_policy.name)
self.assertEqual(EXAMPLE['operating_status'],
test_l7_policy.operating_status)
self.assertEqual(
EXAMPLE['operating_status'], test_l7_policy.operating_status
)
self.assertEqual(EXAMPLE['position'], test_l7_policy.position)
self.assertEqual(EXAMPLE['project_id'], test_l7_policy.project_id)
self.assertEqual(EXAMPLE['provisioning_status'],
test_l7_policy.provisioning_status)
self.assertEqual(EXAMPLE['redirect_pool_id'],
test_l7_policy.redirect_pool_id)
self.assertEqual(EXAMPLE['redirect_prefix'],
test_l7_policy.redirect_prefix)
self.assertEqual(
EXAMPLE['provisioning_status'], test_l7_policy.provisioning_status
)
self.assertEqual(
EXAMPLE['redirect_pool_id'], test_l7_policy.redirect_pool_id
)
self.assertEqual(
EXAMPLE['redirect_prefix'], test_l7_policy.redirect_prefix
)
self.assertEqual(EXAMPLE['redirect_url'], test_l7_policy.redirect_url)
self.assertEqual(EXAMPLE['rules'], test_l7_policy.rules)
self.assertEqual(EXAMPLE['updated_at'], test_l7_policy.updated_at)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'name': 'name',
'description': 'description',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'action': 'action',
'listener_id': 'listener_id',
'position': 'position',
'redirect_pool_id': 'redirect_pool_id',
'redirect_url': 'redirect_url',
'redirect_prefix': 'redirect_prefix'
},
test_l7_policy._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'name': 'name',
'description': 'description',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'action': 'action',
'listener_id': 'listener_id',
'position': 'position',
'redirect_pool_id': 'redirect_pool_id',
'redirect_url': 'redirect_url',
'redirect_prefix': 'redirect_prefix',
},
test_l7_policy._query_mapping._mapping,
)

View File

@@ -29,18 +29,18 @@ EXAMPLE = {
'provisioning_status': 'ACTIVE',
'type': 'COOKIE',
'updated_at': '2017-08-17T12:16:57.233772',
'value': 'chocolate'
'value': 'chocolate',
}
class TestL7Rule(base.TestCase):
def test_basic(self):
test_l7rule = l7_rule.L7Rule()
self.assertEqual('rule', test_l7rule.resource_key)
self.assertEqual('rules', test_l7rule.resources_key)
self.assertEqual('/lbaas/l7policies/%(l7policy_id)s/rules',
test_l7rule.base_path)
self.assertEqual(
'/lbaas/l7policies/%(l7policy_id)s/rules', test_l7rule.base_path
)
self.assertTrue(test_l7rule.allow_create)
self.assertTrue(test_l7rule.allow_fetch)
self.assertTrue(test_l7rule.allow_commit)
@@ -56,34 +56,37 @@ class TestL7Rule(base.TestCase):
self.assertEqual(EXAMPLE['invert'], test_l7rule.invert)
self.assertEqual(EXAMPLE['key'], test_l7rule.key)
self.assertEqual(EXAMPLE['l7_policy_id'], test_l7rule.l7_policy_id)
self.assertEqual(EXAMPLE['operating_status'],
test_l7rule.operating_status)
self.assertEqual(
EXAMPLE['operating_status'], test_l7rule.operating_status
)
self.assertEqual(EXAMPLE['project_id'], test_l7rule.project_id)
self.assertEqual(EXAMPLE['provisioning_status'],
test_l7rule.provisioning_status)
self.assertEqual(
EXAMPLE['provisioning_status'], test_l7rule.provisioning_status
)
self.assertEqual(EXAMPLE['type'], test_l7rule.type)
self.assertEqual(EXAMPLE['updated_at'], test_l7rule.updated_at)
self.assertEqual(EXAMPLE['value'], test_l7rule.rule_value)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'compare_type': 'compare_type',
'invert': 'invert',
'key': 'key',
'type': 'type',
'rule_value': 'rule_value',
'l7_policy_id': 'l7policy_id'
},
test_l7rule._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'compare_type': 'compare_type',
'invert': 'invert',
'key': 'key',
'type': 'type',
'rule_value': 'rule_value',
'l7_policy_id': 'l7policy_id',
},
test_l7rule._query_mapping._mapping,
)

View File

@@ -31,8 +31,10 @@ EXAMPLE = {
'project_id': uuid.uuid4(),
'protocol': 'TEST_PROTOCOL',
'protocol_port': 10,
'default_tls_container_ref': ('http://198.51.100.10:9311/v1/containers/'
'a570068c-d295-4780-91d4-3046a325db51'),
'default_tls_container_ref': (
'http://198.51.100.10:9311/v1/containers/'
'a570068c-d295-4780-91d4-3046a325db51'
),
'sni_container_refs': [],
'created_at': '2017-07-17T12:14:57.233772',
'updated_at': '2017-07-17T12:16:57.233772',
@@ -44,7 +46,7 @@ EXAMPLE = {
'timeout_tcp_inspect': 0,
'tls_ciphers': 'ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-GCM-SHA256',
'tls_versions': ['TLSv1.1', 'TLSv1.2'],
'alpn_protocols': ['h2', 'http/1.1', 'http/1.0']
'alpn_protocols': ['h2', 'http/1.1', 'http/1.0'],
}
EXAMPLE_STATS = {
@@ -52,12 +54,11 @@ EXAMPLE_STATS = {
'bytes_in': 2,
'bytes_out': 3,
'request_errors': 4,
'total_connections': 5
'total_connections': 5,
}
class TestListener(base.TestCase):
def test_basic(self):
test_listener = listener.Listener()
self.assertEqual('listener', test_listener.resource_key)
@@ -73,90 +74,103 @@ class TestListener(base.TestCase):
test_listener = listener.Listener(**EXAMPLE)
self.assertTrue(test_listener.is_admin_state_up)
self.assertEqual(EXAMPLE['allowed_cidrs'], test_listener.allowed_cidrs)
self.assertEqual(EXAMPLE['connection_limit'],
test_listener.connection_limit)
self.assertEqual(EXAMPLE['default_pool_id'],
test_listener.default_pool_id)
self.assertEqual(
EXAMPLE['connection_limit'], test_listener.connection_limit
)
self.assertEqual(
EXAMPLE['default_pool_id'], test_listener.default_pool_id
)
self.assertEqual(EXAMPLE['description'], test_listener.description)
self.assertEqual(EXAMPLE['id'], test_listener.id)
self.assertEqual(EXAMPLE['insert_headers'],
test_listener.insert_headers)
self.assertEqual(EXAMPLE['l7policies'],
test_listener.l7_policies)
self.assertEqual(EXAMPLE['loadbalancers'],
test_listener.load_balancers)
self.assertEqual(
EXAMPLE['insert_headers'], test_listener.insert_headers
)
self.assertEqual(EXAMPLE['l7policies'], test_listener.l7_policies)
self.assertEqual(
EXAMPLE['loadbalancers'], test_listener.load_balancers
)
self.assertEqual(EXAMPLE['name'], test_listener.name)
self.assertEqual(EXAMPLE['project_id'], test_listener.project_id)
self.assertEqual(EXAMPLE['protocol'], test_listener.protocol)
self.assertEqual(EXAMPLE['protocol_port'], test_listener.protocol_port)
self.assertEqual(EXAMPLE['default_tls_container_ref'],
test_listener.default_tls_container_ref)
self.assertEqual(EXAMPLE['sni_container_refs'],
test_listener.sni_container_refs)
self.assertEqual(
EXAMPLE['default_tls_container_ref'],
test_listener.default_tls_container_ref,
)
self.assertEqual(
EXAMPLE['sni_container_refs'], test_listener.sni_container_refs
)
self.assertEqual(EXAMPLE['created_at'], test_listener.created_at)
self.assertEqual(EXAMPLE['updated_at'], test_listener.updated_at)
self.assertEqual(EXAMPLE['provisioning_status'],
test_listener.provisioning_status)
self.assertEqual(EXAMPLE['operating_status'],
test_listener.operating_status)
self.assertEqual(EXAMPLE['timeout_client_data'],
test_listener.timeout_client_data)
self.assertEqual(EXAMPLE['timeout_member_connect'],
test_listener.timeout_member_connect)
self.assertEqual(EXAMPLE['timeout_member_data'],
test_listener.timeout_member_data)
self.assertEqual(EXAMPLE['timeout_tcp_inspect'],
test_listener.timeout_tcp_inspect)
self.assertEqual(EXAMPLE['tls_ciphers'],
test_listener.tls_ciphers)
self.assertEqual(EXAMPLE['tls_versions'],
test_listener.tls_versions)
self.assertEqual(EXAMPLE['alpn_protocols'],
test_listener.alpn_protocols)
self.assertEqual(
EXAMPLE['provisioning_status'], test_listener.provisioning_status
)
self.assertEqual(
EXAMPLE['operating_status'], test_listener.operating_status
)
self.assertEqual(
EXAMPLE['timeout_client_data'], test_listener.timeout_client_data
)
self.assertEqual(
EXAMPLE['timeout_member_connect'],
test_listener.timeout_member_connect,
)
self.assertEqual(
EXAMPLE['timeout_member_data'], test_listener.timeout_member_data
)
self.assertEqual(
EXAMPLE['timeout_tcp_inspect'], test_listener.timeout_tcp_inspect
)
self.assertEqual(EXAMPLE['tls_ciphers'], test_listener.tls_ciphers)
self.assertEqual(EXAMPLE['tls_versions'], test_listener.tls_versions)
self.assertEqual(
EXAMPLE['alpn_protocols'], test_listener.alpn_protocols
)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'description': 'description',
'name': 'name',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'allowed_cidrs': 'allowed_cidrs',
'connection_limit': 'connection_limit',
'default_pool_id': 'default_pool_id',
'default_tls_container_ref': 'default_tls_container_ref',
'sni_container_refs': 'sni_container_refs',
'insert_headers': 'insert_headers',
'load_balancer_id': 'load_balancer_id',
'protocol': 'protocol',
'protocol_port': 'protocol_port',
'timeout_client_data': 'timeout_client_data',
'timeout_member_connect': 'timeout_member_connect',
'timeout_member_data': 'timeout_member_data',
'timeout_tcp_inspect': 'timeout_tcp_inspect',
'tls_ciphers': 'tls_ciphers',
'tls_versions': 'tls_versions',
'alpn_protocols': 'alpn_protocols',
},
test_listener._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'description': 'description',
'name': 'name',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'allowed_cidrs': 'allowed_cidrs',
'connection_limit': 'connection_limit',
'default_pool_id': 'default_pool_id',
'default_tls_container_ref': 'default_tls_container_ref',
'sni_container_refs': 'sni_container_refs',
'insert_headers': 'insert_headers',
'load_balancer_id': 'load_balancer_id',
'protocol': 'protocol',
'protocol_port': 'protocol_port',
'timeout_client_data': 'timeout_client_data',
'timeout_member_connect': 'timeout_member_connect',
'timeout_member_data': 'timeout_member_data',
'timeout_tcp_inspect': 'timeout_tcp_inspect',
'tls_ciphers': 'tls_ciphers',
'tls_versions': 'tls_versions',
'alpn_protocols': 'alpn_protocols',
},
test_listener._query_mapping._mapping,
)
class TestListenerStats(base.TestCase):
def test_basic(self):
test_listener = listener.ListenerStats()
self.assertEqual('stats', test_listener.resource_key)
self.assertEqual('/lbaas/listeners/%(listener_id)s/stats',
test_listener.base_path)
self.assertEqual(
'/lbaas/listeners/%(listener_id)s/stats', test_listener.base_path
)
self.assertFalse(test_listener.allow_create)
self.assertTrue(test_listener.allow_fetch)
self.assertFalse(test_listener.allow_delete)
@@ -165,13 +179,15 @@ class TestListenerStats(base.TestCase):
def test_make_it(self):
test_listener = listener.ListenerStats(**EXAMPLE_STATS)
self.assertEqual(EXAMPLE_STATS['active_connections'],
test_listener.active_connections)
self.assertEqual(EXAMPLE_STATS['bytes_in'],
test_listener.bytes_in)
self.assertEqual(EXAMPLE_STATS['bytes_out'],
test_listener.bytes_out)
self.assertEqual(EXAMPLE_STATS['request_errors'],
test_listener.request_errors)
self.assertEqual(EXAMPLE_STATS['total_connections'],
test_listener.total_connections)
self.assertEqual(
EXAMPLE_STATS['active_connections'],
test_listener.active_connections,
)
self.assertEqual(EXAMPLE_STATS['bytes_in'], test_listener.bytes_in)
self.assertEqual(EXAMPLE_STATS['bytes_out'], test_listener.bytes_out)
self.assertEqual(
EXAMPLE_STATS['request_errors'], test_listener.request_errors
)
self.assertEqual(
EXAMPLE_STATS['total_connections'], test_listener.total_connections
)

View File

@@ -38,14 +38,9 @@ EXAMPLE = {
'vip_subnet_id': uuid.uuid4(),
'vip_qos_policy_id': uuid.uuid4(),
'additional_vips': [
{
'subnet_id': uuid.uuid4(),
'ip_address': '192.0.2.6'
}, {
'subnet_id': uuid.uuid4(),
'ip_address': '192.0.2.7'
}
]
{'subnet_id': uuid.uuid4(), 'ip_address': '192.0.2.6'},
{'subnet_id': uuid.uuid4(), 'ip_address': '192.0.2.7'},
],
}
EXAMPLE_STATS = {
@@ -53,18 +48,16 @@ EXAMPLE_STATS = {
'bytes_in': 2,
'bytes_out': 3,
'request_errors': 4,
'total_connections': 5
'total_connections': 5,
}
class TestLoadBalancer(base.TestCase):
def test_basic(self):
test_load_balancer = load_balancer.LoadBalancer()
self.assertEqual('loadbalancer', test_load_balancer.resource_key)
self.assertEqual('loadbalancers', test_load_balancer.resources_key)
self.assertEqual('/lbaas/loadbalancers',
test_load_balancer.base_path)
self.assertEqual('/lbaas/loadbalancers', test_load_balancer.base_path)
self.assertTrue(test_load_balancer.allow_create)
self.assertTrue(test_load_balancer.allow_fetch)
self.assertTrue(test_load_balancer.allow_delete)
@@ -74,59 +67,72 @@ class TestLoadBalancer(base.TestCase):
def test_make_it(self):
test_load_balancer = load_balancer.LoadBalancer(**EXAMPLE)
self.assertTrue(test_load_balancer.is_admin_state_up)
self.assertEqual(EXAMPLE['availability_zone'],
test_load_balancer.availability_zone)
self.assertEqual(
EXAMPLE['availability_zone'], test_load_balancer.availability_zone
)
self.assertEqual(EXAMPLE['created_at'], test_load_balancer.created_at)
self.assertEqual(EXAMPLE['description'],
test_load_balancer.description)
self.assertEqual(
EXAMPLE['description'], test_load_balancer.description
)
self.assertEqual(EXAMPLE['flavor_id'], test_load_balancer.flavor_id)
self.assertEqual(EXAMPLE['id'], test_load_balancer.id)
self.assertEqual(EXAMPLE['listeners'], test_load_balancer.listeners)
self.assertEqual(EXAMPLE['name'], test_load_balancer.name)
self.assertEqual(EXAMPLE['operating_status'],
test_load_balancer.operating_status)
self.assertEqual(
EXAMPLE['operating_status'], test_load_balancer.operating_status
)
self.assertEqual(EXAMPLE['pools'], test_load_balancer.pools)
self.assertEqual(EXAMPLE['project_id'], test_load_balancer.project_id)
self.assertEqual(EXAMPLE['provider'], test_load_balancer.provider)
self.assertEqual(EXAMPLE['provisioning_status'],
test_load_balancer.provisioning_status)
self.assertEqual(
EXAMPLE['provisioning_status'],
test_load_balancer.provisioning_status,
)
self.assertEqual(EXAMPLE['updated_at'], test_load_balancer.updated_at)
self.assertEqual(EXAMPLE['vip_address'],
test_load_balancer.vip_address)
self.assertEqual(EXAMPLE['vip_network_id'],
test_load_balancer.vip_network_id)
self.assertEqual(EXAMPLE['vip_port_id'],
test_load_balancer.vip_port_id)
self.assertEqual(EXAMPLE['vip_subnet_id'],
test_load_balancer.vip_subnet_id)
self.assertEqual(EXAMPLE['vip_qos_policy_id'],
test_load_balancer.vip_qos_policy_id)
self.assertEqual(EXAMPLE['additional_vips'],
test_load_balancer.additional_vips)
self.assertEqual(
EXAMPLE['vip_address'], test_load_balancer.vip_address
)
self.assertEqual(
EXAMPLE['vip_network_id'], test_load_balancer.vip_network_id
)
self.assertEqual(
EXAMPLE['vip_port_id'], test_load_balancer.vip_port_id
)
self.assertEqual(
EXAMPLE['vip_subnet_id'], test_load_balancer.vip_subnet_id
)
self.assertEqual(
EXAMPLE['vip_qos_policy_id'], test_load_balancer.vip_qos_policy_id
)
self.assertEqual(
EXAMPLE['additional_vips'], test_load_balancer.additional_vips
)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'availability_zone': 'availability_zone',
'description': 'description',
'flavor_id': 'flavor_id',
'name': 'name',
'project_id': 'project_id',
'provider': 'provider',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'vip_address': 'vip_address',
'vip_network_id': 'vip_network_id',
'vip_port_id': 'vip_port_id',
'vip_subnet_id': 'vip_subnet_id',
'vip_qos_policy_id': 'vip_qos_policy_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
},
test_load_balancer._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'availability_zone': 'availability_zone',
'description': 'description',
'flavor_id': 'flavor_id',
'name': 'name',
'project_id': 'project_id',
'provider': 'provider',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'vip_address': 'vip_address',
'vip_network_id': 'vip_network_id',
'vip_port_id': 'vip_port_id',
'vip_subnet_id': 'vip_subnet_id',
'vip_qos_policy_id': 'vip_qos_policy_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
},
test_load_balancer._query_mapping._mapping,
)
def test_delete_non_cascade(self):
sess = mock.Mock()
@@ -138,12 +144,9 @@ class TestLoadBalancer(base.TestCase):
sot._translate_response = mock.Mock()
sot.delete(sess)
url = 'lbaas/loadbalancers/%(lb)s' % {
'lb': EXAMPLE['id']
}
url = 'lbaas/loadbalancers/%(lb)s' % {'lb': EXAMPLE['id']}
params = {}
sess.delete.assert_called_with(url,
params=params)
sess.delete.assert_called_with(url, params=params)
sot._translate_response.assert_called_once_with(
resp,
error_message=None,
@@ -160,12 +163,9 @@ class TestLoadBalancer(base.TestCase):
sot._translate_response = mock.Mock()
sot.delete(sess)
url = 'lbaas/loadbalancers/%(lb)s' % {
'lb': EXAMPLE['id']
}
url = 'lbaas/loadbalancers/%(lb)s' % {'lb': EXAMPLE['id']}
params = {'cascade': True}
sess.delete.assert_called_with(url,
params=params)
sess.delete.assert_called_with(url, params=params)
sot._translate_response.assert_called_once_with(
resp,
error_message=None,
@@ -174,12 +174,13 @@ class TestLoadBalancer(base.TestCase):
class TestLoadBalancerStats(base.TestCase):
def test_basic(self):
test_load_balancer = load_balancer.LoadBalancerStats()
self.assertEqual('stats', test_load_balancer.resource_key)
self.assertEqual('/lbaas/loadbalancers/%(lb_id)s/stats',
test_load_balancer.base_path)
self.assertEqual(
'/lbaas/loadbalancers/%(lb_id)s/stats',
test_load_balancer.base_path,
)
self.assertFalse(test_load_balancer.allow_create)
self.assertTrue(test_load_balancer.allow_fetch)
self.assertFalse(test_load_balancer.allow_delete)
@@ -188,24 +189,32 @@ class TestLoadBalancerStats(base.TestCase):
def test_make_it(self):
test_load_balancer = load_balancer.LoadBalancerStats(**EXAMPLE_STATS)
self.assertEqual(EXAMPLE_STATS['active_connections'],
test_load_balancer.active_connections)
self.assertEqual(EXAMPLE_STATS['bytes_in'],
test_load_balancer.bytes_in)
self.assertEqual(EXAMPLE_STATS['bytes_out'],
test_load_balancer.bytes_out)
self.assertEqual(EXAMPLE_STATS['request_errors'],
test_load_balancer.request_errors)
self.assertEqual(EXAMPLE_STATS['total_connections'],
test_load_balancer.total_connections)
self.assertEqual(
EXAMPLE_STATS['active_connections'],
test_load_balancer.active_connections,
)
self.assertEqual(
EXAMPLE_STATS['bytes_in'], test_load_balancer.bytes_in
)
self.assertEqual(
EXAMPLE_STATS['bytes_out'], test_load_balancer.bytes_out
)
self.assertEqual(
EXAMPLE_STATS['request_errors'], test_load_balancer.request_errors
)
self.assertEqual(
EXAMPLE_STATS['total_connections'],
test_load_balancer.total_connections,
)
class TestLoadBalancerFailover(base.TestCase):
def test_basic(self):
test_load_balancer = load_balancer.LoadBalancerFailover()
self.assertEqual('/lbaas/loadbalancers/%(lb_id)s/failover',
test_load_balancer.base_path)
self.assertEqual(
'/lbaas/loadbalancers/%(lb_id)s/failover',
test_load_balancer.base_path,
)
self.assertFalse(test_load_balancer.allow_create)
self.assertFalse(test_load_balancer.allow_fetch)
self.assertFalse(test_load_balancer.allow_delete)

View File

@@ -34,13 +34,13 @@ EXAMPLE = {
class TestPoolMember(base.TestCase):
def test_basic(self):
test_member = member.Member()
self.assertEqual('member', test_member.resource_key)
self.assertEqual('members', test_member.resources_key)
self.assertEqual('/lbaas/pools/%(pool_id)s/members',
test_member.base_path)
self.assertEqual(
'/lbaas/pools/%(pool_id)s/members', test_member.base_path
)
self.assertTrue(test_member.allow_create)
self.assertTrue(test_member.allow_fetch)
self.assertTrue(test_member.allow_commit)
@@ -52,8 +52,9 @@ class TestPoolMember(base.TestCase):
self.assertEqual(EXAMPLE['address'], test_member.address)
self.assertTrue(test_member.is_admin_state_up)
self.assertEqual(EXAMPLE['id'], test_member.id)
self.assertEqual(EXAMPLE['monitor_address'],
test_member.monitor_address)
self.assertEqual(
EXAMPLE['monitor_address'], test_member.monitor_address
)
self.assertEqual(EXAMPLE['monitor_port'], test_member.monitor_port)
self.assertEqual(EXAMPLE['name'], test_member.name)
self.assertEqual(EXAMPLE['pool_id'], test_member.pool_id)
@@ -64,26 +65,27 @@ class TestPoolMember(base.TestCase):
self.assertFalse(test_member.backup)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'name': 'name',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'address': 'address',
'protocol_port': 'protocol_port',
'subnet_id': 'subnet_id',
'weight': 'weight',
'monitor_address': 'monitor_address',
'monitor_port': 'monitor_port',
'backup': 'backup'
},
test_member._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'name': 'name',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'address': 'address',
'protocol_port': 'protocol_port',
'subnet_id': 'subnet_id',
'weight': 'weight',
'monitor_address': 'monitor_address',
'monitor_port': 'monitor_port',
'backup': 'backup',
},
test_member._query_mapping._mapping,
)

View File

@@ -44,7 +44,6 @@ EXAMPLE = {
class TestPool(base.TestCase):
def test_basic(self):
test_pool = pool.Pool()
self.assertEqual('pool', test_pool.resource_key)
@@ -59,66 +58,62 @@ class TestPool(base.TestCase):
def test_make_it(self):
test_pool = pool.Pool(**EXAMPLE)
self.assertEqual(EXAMPLE['name'], test_pool.name),
self.assertEqual(EXAMPLE['description'],
test_pool.description)
self.assertEqual(EXAMPLE['admin_state_up'],
test_pool.is_admin_state_up)
self.assertEqual(EXAMPLE['provisioning_status'],
test_pool.provisioning_status)
self.assertEqual(EXAMPLE['description'], test_pool.description)
self.assertEqual(
EXAMPLE['admin_state_up'], test_pool.is_admin_state_up
)
self.assertEqual(
EXAMPLE['provisioning_status'], test_pool.provisioning_status
)
self.assertEqual(EXAMPLE['protocol'], test_pool.protocol)
self.assertEqual(EXAMPLE['operating_status'],
test_pool.operating_status)
self.assertEqual(
EXAMPLE['operating_status'], test_pool.operating_status
)
self.assertEqual(EXAMPLE['listener_id'], test_pool.listener_id)
self.assertEqual(EXAMPLE['loadbalancer_id'],
test_pool.loadbalancer_id)
self.assertEqual(EXAMPLE['lb_algorithm'],
test_pool.lb_algorithm)
self.assertEqual(EXAMPLE['session_persistence'],
test_pool.session_persistence)
self.assertEqual(EXAMPLE['project_id'],
test_pool.project_id)
self.assertEqual(EXAMPLE['loadbalancers'],
test_pool.loadbalancers)
self.assertEqual(EXAMPLE['listeners'],
test_pool.listeners)
self.assertEqual(EXAMPLE['loadbalancer_id'], test_pool.loadbalancer_id)
self.assertEqual(EXAMPLE['lb_algorithm'], test_pool.lb_algorithm)
self.assertEqual(
EXAMPLE['session_persistence'], test_pool.session_persistence
)
self.assertEqual(EXAMPLE['project_id'], test_pool.project_id)
self.assertEqual(EXAMPLE['loadbalancers'], test_pool.loadbalancers)
self.assertEqual(EXAMPLE['listeners'], test_pool.listeners)
self.assertEqual(EXAMPLE['created_at'], test_pool.created_at)
self.assertEqual(EXAMPLE['updated_at'], test_pool.updated_at)
self.assertEqual(EXAMPLE['health_monitor_id'],
test_pool.health_monitor_id)
self.assertEqual(
EXAMPLE['health_monitor_id'], test_pool.health_monitor_id
)
self.assertEqual(EXAMPLE['members'], test_pool.members)
self.assertEqual(EXAMPLE['tls_enabled'],
test_pool.tls_enabled)
self.assertEqual(EXAMPLE['tls_ciphers'],
test_pool.tls_ciphers)
self.assertEqual(EXAMPLE['tls_versions'],
test_pool.tls_versions)
self.assertEqual(EXAMPLE['alpn_protocols'],
test_pool.alpn_protocols)
self.assertEqual(EXAMPLE['tls_enabled'], test_pool.tls_enabled)
self.assertEqual(EXAMPLE['tls_ciphers'], test_pool.tls_ciphers)
self.assertEqual(EXAMPLE['tls_versions'], test_pool.tls_versions)
self.assertEqual(EXAMPLE['alpn_protocols'], test_pool.alpn_protocols)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'description': 'description',
'name': 'name',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'health_monitor_id': 'health_monitor_id',
'lb_algorithm': 'lb_algorithm',
'listener_id': 'listener_id',
'loadbalancer_id': 'loadbalancer_id',
'protocol': 'protocol',
'tls_enabled': 'tls_enabled',
'tls_ciphers': 'tls_ciphers',
'tls_versions': 'tls_versions',
'alpn_protocols': 'alpn_protocols',
},
test_pool._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'created_at': 'created_at',
'updated_at': 'updated_at',
'description': 'description',
'name': 'name',
'project_id': 'project_id',
'tags': 'tags',
'any_tags': 'tags-any',
'not_tags': 'not-tags',
'not_any_tags': 'not-tags-any',
'operating_status': 'operating_status',
'provisioning_status': 'provisioning_status',
'is_admin_state_up': 'admin_state_up',
'health_monitor_id': 'health_monitor_id',
'lb_algorithm': 'lb_algorithm',
'listener_id': 'listener_id',
'loadbalancer_id': 'loadbalancer_id',
'protocol': 'protocol',
'tls_enabled': 'tls_enabled',
'tls_ciphers': 'tls_ciphers',
'tls_versions': 'tls_versions',
'alpn_protocols': 'alpn_protocols',
},
test_pool._query_mapping._mapping,
)

View File

@@ -16,14 +16,10 @@ from openstack.load_balancer.v2 import provider
from openstack.tests.unit import base
EXAMPLE = {
'name': 'best',
'description': 'The best provider'
}
EXAMPLE = {'name': 'best', 'description': 'The best provider'}
class TestProvider(base.TestCase):
def test_basic(self):
test_provider = provider.Provider()
self.assertEqual('providers', test_provider.resources_key)
@@ -40,20 +36,24 @@ class TestProvider(base.TestCase):
self.assertEqual(EXAMPLE['description'], test_provider.description)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'name': 'name',
'description': 'description'},
test_provider._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'name': 'name',
'description': 'description',
},
test_provider._query_mapping._mapping,
)
class TestProviderFlavorCapabilities(base.TestCase):
def test_basic(self):
test_flav_cap = provider.ProviderFlavorCapabilities()
self.assertEqual('flavor_capabilities', test_flav_cap.resources_key)
self.assertEqual('/lbaas/providers/%(provider)s/flavor_capabilities',
test_flav_cap.base_path)
self.assertEqual(
'/lbaas/providers/%(provider)s/flavor_capabilities',
test_flav_cap.base_path,
)
self.assertFalse(test_flav_cap.allow_create)
self.assertFalse(test_flav_cap.allow_fetch)
self.assertFalse(test_flav_cap.allow_commit)
@@ -66,8 +66,11 @@ class TestProviderFlavorCapabilities(base.TestCase):
self.assertEqual(EXAMPLE['description'], test_flav_cap.description)
self.assertDictEqual(
{'limit': 'limit',
'marker': 'marker',
'name': 'name',
'description': 'description'},
test_flav_cap._query_mapping._mapping)
{
'limit': 'limit',
'marker': 'marker',
'name': 'name',
'description': 'description',
},
test_flav_cap._query_mapping._mapping,
)

View File

@@ -29,7 +29,6 @@ EXAMPLE = {
class TestQuota(base.TestCase):
def test_basic(self):
sot = quota.Quota()
self.assertEqual('quota', sot.resource_key)
@@ -58,7 +57,6 @@ class TestQuota(base.TestCase):
class TestQuotaDefault(base.TestCase):
def test_basic(self):
sot = quota.QuotaDefault()
self.assertEqual('quota', sot.resource_key)

View File

@@ -23,7 +23,6 @@ EXAMPLE = {
class TestVersion(base.TestCase):
def test_basic(self):
sot = version.Version()
self.assertEqual('version', sot.resource_key)

View File

@@ -46,24 +46,22 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
self.proxy = _proxy.Proxy(self.session)
def test_load_balancers(self):
self.verify_list(self.proxy.load_balancers,
lb.LoadBalancer)
self.verify_list(self.proxy.load_balancers, lb.LoadBalancer)
def test_load_balancer_get(self):
self.verify_get(self.proxy.get_load_balancer,
lb.LoadBalancer)
self.verify_get(self.proxy.get_load_balancer, lb.LoadBalancer)
def test_load_balancer_stats_get(self):
self.verify_get(self.proxy.get_load_balancer_statistics,
lb.LoadBalancerStats,
method_args=[self.LB_ID],
expected_args=[],
expected_kwargs={'lb_id': self.LB_ID,
'requires_id': False})
self.verify_get(
self.proxy.get_load_balancer_statistics,
lb.LoadBalancerStats,
method_args=[self.LB_ID],
expected_args=[],
expected_kwargs={'lb_id': self.LB_ID, 'requires_id': False},
)
def test_load_balancer_create(self):
self.verify_create(self.proxy.create_load_balancer,
lb.LoadBalancer)
self.verify_create(self.proxy.create_load_balancer, lb.LoadBalancer)
@mock.patch.object(proxy_base.Proxy, '_get_resource')
def test_load_balancer_delete_non_cascade(self, mock_get_resource):
@@ -75,10 +73,12 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
self.proxy.delete_load_balancer,
method_args=["resource_or_id", True, False],
expected_args=[lb.LoadBalancer, fake_load_balancer],
expected_kwargs={"ignore_missing": True})
expected_kwargs={"ignore_missing": True},
)
self.assertFalse(fake_load_balancer.cascade)
mock_get_resource.assert_called_once_with(lb.LoadBalancer,
"resource_or_id")
mock_get_resource.assert_called_once_with(
lb.LoadBalancer, "resource_or_id"
)
@mock.patch.object(proxy_base.Proxy, '_get_resource')
def test_load_balancer_delete_cascade(self, mock_get_resource):
@@ -90,108 +90,108 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
self.proxy.delete_load_balancer,
method_args=["resource_or_id", True, True],
expected_args=[lb.LoadBalancer, fake_load_balancer],
expected_kwargs={"ignore_missing": True})
expected_kwargs={"ignore_missing": True},
)
self.assertTrue(fake_load_balancer.cascade)
mock_get_resource.assert_called_once_with(lb.LoadBalancer,
"resource_or_id")
mock_get_resource.assert_called_once_with(
lb.LoadBalancer, "resource_or_id"
)
def test_load_balancer_find(self):
self.verify_find(self.proxy.find_load_balancer,
lb.LoadBalancer)
self.verify_find(self.proxy.find_load_balancer, lb.LoadBalancer)
def test_load_balancer_update(self):
self.verify_update(self.proxy.update_load_balancer,
lb.LoadBalancer)
self.verify_update(self.proxy.update_load_balancer, lb.LoadBalancer)
def test_load_balancer_failover(self):
self.verify_update(self.proxy.failover_load_balancer,
lb.LoadBalancerFailover,
method_args=[self.LB_ID],
expected_args=[],
expected_kwargs={'lb_id': self.LB_ID})
self.verify_update(
self.proxy.failover_load_balancer,
lb.LoadBalancerFailover,
method_args=[self.LB_ID],
expected_args=[],
expected_kwargs={'lb_id': self.LB_ID},
)
def test_listeners(self):
self.verify_list(self.proxy.listeners,
listener.Listener)
self.verify_list(self.proxy.listeners, listener.Listener)
def test_listener_get(self):
self.verify_get(self.proxy.get_listener,
listener.Listener)
self.verify_get(self.proxy.get_listener, listener.Listener)
def test_listener_stats_get(self):
self.verify_get(self.proxy.get_listener_statistics,
listener.ListenerStats,
method_args=[self.LISTENER_ID],
expected_args=[],
expected_kwargs={'listener_id': self.LISTENER_ID,
'requires_id': False})
self.verify_get(
self.proxy.get_listener_statistics,
listener.ListenerStats,
method_args=[self.LISTENER_ID],
expected_args=[],
expected_kwargs={
'listener_id': self.LISTENER_ID,
'requires_id': False,
},
)
def test_listener_create(self):
self.verify_create(self.proxy.create_listener,
listener.Listener)
self.verify_create(self.proxy.create_listener, listener.Listener)
def test_listener_delete(self):
self.verify_delete(self.proxy.delete_listener,
listener.Listener, True)
self.verify_delete(self.proxy.delete_listener, listener.Listener, True)
def test_listener_find(self):
self.verify_find(self.proxy.find_listener,
listener.Listener)
self.verify_find(self.proxy.find_listener, listener.Listener)
def test_listener_update(self):
self.verify_update(self.proxy.update_listener,
listener.Listener)
self.verify_update(self.proxy.update_listener, listener.Listener)
def test_pools(self):
self.verify_list(self.proxy.pools,
pool.Pool)
self.verify_list(self.proxy.pools, pool.Pool)
def test_pool_get(self):
self.verify_get(self.proxy.get_pool,
pool.Pool)
self.verify_get(self.proxy.get_pool, pool.Pool)
def test_pool_create(self):
self.verify_create(self.proxy.create_pool,
pool.Pool)
self.verify_create(self.proxy.create_pool, pool.Pool)
def test_pool_delete(self):
self.verify_delete(self.proxy.delete_pool,
pool.Pool, True)
self.verify_delete(self.proxy.delete_pool, pool.Pool, True)
def test_pool_find(self):
self.verify_find(self.proxy.find_pool,
pool.Pool)
self.verify_find(self.proxy.find_pool, pool.Pool)
def test_pool_update(self):
self.verify_update(self.proxy.update_pool,
pool.Pool)
self.verify_update(self.proxy.update_pool, pool.Pool)
def test_members(self):
self.verify_list(self.proxy.members,
member.Member,
method_kwargs={'pool': self.POOL_ID},
expected_kwargs={'pool_id': self.POOL_ID})
self.verify_list(
self.proxy.members,
member.Member,
method_kwargs={'pool': self.POOL_ID},
expected_kwargs={'pool_id': self.POOL_ID},
)
def test_member_get(self):
self.verify_get(self.proxy.get_member,
member.Member,
method_kwargs={'pool': self.POOL_ID},
expected_kwargs={'pool_id': self.POOL_ID})
self.verify_get(
self.proxy.get_member,
member.Member,
method_kwargs={'pool': self.POOL_ID},
expected_kwargs={'pool_id': self.POOL_ID},
)
def test_member_create(self):
self.verify_create(self.proxy.create_member,
member.Member,
method_kwargs={'pool': self.POOL_ID},
expected_kwargs={'pool_id': self.POOL_ID})
self.verify_create(
self.proxy.create_member,
member.Member,
method_kwargs={'pool': self.POOL_ID},
expected_kwargs={'pool_id': self.POOL_ID},
)
def test_member_delete(self):
self.verify_delete(self.proxy.delete_member,
member.Member,
ignore_missing=True,
method_kwargs={'pool': self.POOL_ID},
expected_kwargs={
'pool_id': self.POOL_ID,
'ignore_missing': True})
self.verify_delete(
self.proxy.delete_member,
member.Member,
ignore_missing=True,
method_kwargs={'pool': self.POOL_ID},
expected_kwargs={'pool_id': self.POOL_ID, 'ignore_missing': True},
)
def test_member_find(self):
self._verify(
@@ -199,7 +199,8 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
self.proxy.find_member,
method_args=["MEMBER", self.POOL_ID],
expected_args=[member.Member, "MEMBER"],
expected_kwargs={"pool_id": self.POOL_ID, "ignore_missing": True})
expected_kwargs={"pool_id": self.POOL_ID, "ignore_missing": True},
)
def test_member_update(self):
self._verify(
@@ -207,80 +208,93 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
self.proxy.update_member,
method_args=["MEMBER", self.POOL_ID],
expected_args=[member.Member, "MEMBER"],
expected_kwargs={"pool_id": self.POOL_ID})
expected_kwargs={"pool_id": self.POOL_ID},
)
def test_health_monitors(self):
self.verify_list(self.proxy.health_monitors,
health_monitor.HealthMonitor)
self.verify_list(
self.proxy.health_monitors, health_monitor.HealthMonitor
)
def test_health_monitor_get(self):
self.verify_get(self.proxy.get_health_monitor,
health_monitor.HealthMonitor)
self.verify_get(
self.proxy.get_health_monitor, health_monitor.HealthMonitor
)
def test_health_monitor_create(self):
self.verify_create(self.proxy.create_health_monitor,
health_monitor.HealthMonitor)
self.verify_create(
self.proxy.create_health_monitor, health_monitor.HealthMonitor
)
def test_health_monitor_delete(self):
self.verify_delete(self.proxy.delete_health_monitor,
health_monitor.HealthMonitor, True)
self.verify_delete(
self.proxy.delete_health_monitor,
health_monitor.HealthMonitor,
True,
)
def test_health_monitor_find(self):
self.verify_find(self.proxy.find_health_monitor,
health_monitor.HealthMonitor)
self.verify_find(
self.proxy.find_health_monitor, health_monitor.HealthMonitor
)
def test_health_monitor_update(self):
self.verify_update(self.proxy.update_health_monitor,
health_monitor.HealthMonitor)
self.verify_update(
self.proxy.update_health_monitor, health_monitor.HealthMonitor
)
def test_l7_policies(self):
self.verify_list(self.proxy.l7_policies,
l7_policy.L7Policy)
self.verify_list(self.proxy.l7_policies, l7_policy.L7Policy)
def test_l7_policy_get(self):
self.verify_get(self.proxy.get_l7_policy,
l7_policy.L7Policy)
self.verify_get(self.proxy.get_l7_policy, l7_policy.L7Policy)
def test_l7_policy_create(self):
self.verify_create(self.proxy.create_l7_policy,
l7_policy.L7Policy)
self.verify_create(self.proxy.create_l7_policy, l7_policy.L7Policy)
def test_l7_policy_delete(self):
self.verify_delete(self.proxy.delete_l7_policy,
l7_policy.L7Policy, True)
self.verify_delete(
self.proxy.delete_l7_policy, l7_policy.L7Policy, True
)
def test_l7_policy_find(self):
self.verify_find(self.proxy.find_l7_policy,
l7_policy.L7Policy)
self.verify_find(self.proxy.find_l7_policy, l7_policy.L7Policy)
def test_l7_policy_update(self):
self.verify_update(self.proxy.update_l7_policy,
l7_policy.L7Policy)
self.verify_update(self.proxy.update_l7_policy, l7_policy.L7Policy)
def test_l7_rules(self):
self.verify_list(self.proxy.l7_rules,
l7_rule.L7Rule,
method_kwargs={'l7_policy': self.L7_POLICY_ID},
expected_kwargs={'l7policy_id': self.L7_POLICY_ID})
self.verify_list(
self.proxy.l7_rules,
l7_rule.L7Rule,
method_kwargs={'l7_policy': self.L7_POLICY_ID},
expected_kwargs={'l7policy_id': self.L7_POLICY_ID},
)
def test_l7_rule_get(self):
self.verify_get(self.proxy.get_l7_rule,
l7_rule.L7Rule,
method_kwargs={'l7_policy': self.L7_POLICY_ID},
expected_kwargs={'l7policy_id': self.L7_POLICY_ID})
self.verify_get(
self.proxy.get_l7_rule,
l7_rule.L7Rule,
method_kwargs={'l7_policy': self.L7_POLICY_ID},
expected_kwargs={'l7policy_id': self.L7_POLICY_ID},
)
def test_l7_rule_create(self):
self.verify_create(self.proxy.create_l7_rule,
l7_rule.L7Rule,
method_kwargs={'l7_policy': self.L7_POLICY_ID},
expected_kwargs={'l7policy_id': self.L7_POLICY_ID})
self.verify_create(
self.proxy.create_l7_rule,
l7_rule.L7Rule,
method_kwargs={'l7_policy': self.L7_POLICY_ID},
expected_kwargs={'l7policy_id': self.L7_POLICY_ID},
)
def test_l7_rule_delete(self):
self.verify_delete(self.proxy.delete_l7_rule,
l7_rule.L7Rule,
ignore_missing=True,
method_kwargs={'l7_policy': self.L7_POLICY_ID},
expected_kwargs={'l7policy_id': self.L7_POLICY_ID})
self.verify_delete(
self.proxy.delete_l7_rule,
l7_rule.L7Rule,
ignore_missing=True,
method_kwargs={'l7_policy': self.L7_POLICY_ID},
expected_kwargs={'l7policy_id': self.L7_POLICY_ID},
)
def test_l7_rule_find(self):
self._verify(
@@ -289,7 +303,10 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
method_args=["RULE", self.L7_POLICY_ID],
expected_args=[l7_rule.L7Rule, "RULE"],
expected_kwargs={
"l7policy_id": self.L7_POLICY_ID, "ignore_missing": True})
"l7policy_id": self.L7_POLICY_ID,
"ignore_missing": True,
},
)
def test_l7_rule_update(self):
self._verify(
@@ -297,7 +314,8 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
self.proxy.update_l7_rule,
method_args=["RULE", self.L7_POLICY_ID],
expected_args=[l7_rule.L7Rule, "RULE"],
expected_kwargs={"l7policy_id": self.L7_POLICY_ID})
expected_kwargs={"l7policy_id": self.L7_POLICY_ID},
)
def test_quotas(self):
self.verify_list(self.proxy.quotas, quota.Quota)
@@ -313,7 +331,8 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
"openstack.proxy.Proxy._get",
self.proxy.get_quota_default,
expected_args=[quota.QuotaDefault],
expected_kwargs={'requires_id': False})
expected_kwargs={'requires_id': False},
)
def test_quota_delete(self):
self.verify_delete(self.proxy.delete_quota, quota.Quota, False)
@@ -325,35 +344,45 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
self.verify_list(self.proxy.providers, provider.Provider)
def test_provider_flavor_capabilities(self):
self.verify_list(self.proxy.provider_flavor_capabilities,
provider.ProviderFlavorCapabilities,
method_args=[self.AMPHORA],
expected_args=[],
expected_kwargs={'provider': self.AMPHORA})
self.verify_list(
self.proxy.provider_flavor_capabilities,
provider.ProviderFlavorCapabilities,
method_args=[self.AMPHORA],
expected_args=[],
expected_kwargs={'provider': self.AMPHORA},
)
def test_flavor_profiles(self):
self.verify_list(self.proxy.flavor_profiles,
flavor_profile.FlavorProfile)
self.verify_list(
self.proxy.flavor_profiles, flavor_profile.FlavorProfile
)
def test_flavor_profile_get(self):
self.verify_get(self.proxy.get_flavor_profile,
flavor_profile.FlavorProfile)
self.verify_get(
self.proxy.get_flavor_profile, flavor_profile.FlavorProfile
)
def test_flavor_profile_create(self):
self.verify_create(self.proxy.create_flavor_profile,
flavor_profile.FlavorProfile)
self.verify_create(
self.proxy.create_flavor_profile, flavor_profile.FlavorProfile
)
def test_flavor_profile_delete(self):
self.verify_delete(self.proxy.delete_flavor_profile,
flavor_profile.FlavorProfile, True)
self.verify_delete(
self.proxy.delete_flavor_profile,
flavor_profile.FlavorProfile,
True,
)
def test_flavor_profile_find(self):
self.verify_find(self.proxy.find_flavor_profile,
flavor_profile.FlavorProfile)
self.verify_find(
self.proxy.find_flavor_profile, flavor_profile.FlavorProfile
)
def test_flavor_profile_update(self):
self.verify_update(self.proxy.update_flavor_profile,
flavor_profile.FlavorProfile)
self.verify_update(
self.proxy.update_flavor_profile, flavor_profile.FlavorProfile
)
def test_flavors(self):
self.verify_list(self.proxy.flavors, flavor.Flavor)
@@ -383,64 +412,92 @@ class TestLoadBalancerProxy(test_proxy_base.TestProxyBase):
self.verify_find(self.proxy.find_amphora, amphora.Amphora)
def test_amphora_configure(self):
self.verify_update(self.proxy.configure_amphora,
amphora.AmphoraConfig,
method_args=[self.AMPHORA_ID],
expected_args=[],
expected_kwargs={'amphora_id': self.AMPHORA_ID})
self.verify_update(
self.proxy.configure_amphora,
amphora.AmphoraConfig,
method_args=[self.AMPHORA_ID],
expected_args=[],
expected_kwargs={'amphora_id': self.AMPHORA_ID},
)
def test_amphora_failover(self):
self.verify_update(self.proxy.failover_amphora,
amphora.AmphoraFailover,
method_args=[self.AMPHORA_ID],
expected_args=[],
expected_kwargs={'amphora_id': self.AMPHORA_ID})
self.verify_update(
self.proxy.failover_amphora,
amphora.AmphoraFailover,
method_args=[self.AMPHORA_ID],
expected_args=[],
expected_kwargs={'amphora_id': self.AMPHORA_ID},
)
def test_availability_zone_profiles(self):
self.verify_list(self.proxy.availability_zone_profiles,
availability_zone_profile.AvailabilityZoneProfile)
self.verify_list(
self.proxy.availability_zone_profiles,
availability_zone_profile.AvailabilityZoneProfile,
)
def test_availability_zone_profile_get(self):
self.verify_get(self.proxy.get_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile)
self.verify_get(
self.proxy.get_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile,
)
def test_availability_zone_profile_create(self):
self.verify_create(self.proxy.create_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile)
self.verify_create(
self.proxy.create_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile,
)
def test_availability_zone_profile_delete(self):
self.verify_delete(self.proxy.delete_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile,
True)
self.verify_delete(
self.proxy.delete_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile,
True,
)
def test_availability_zone_profile_find(self):
self.verify_find(self.proxy.find_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile)
self.verify_find(
self.proxy.find_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile,
)
def test_availability_zone_profile_update(self):
self.verify_update(self.proxy.update_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile)
self.verify_update(
self.proxy.update_availability_zone_profile,
availability_zone_profile.AvailabilityZoneProfile,
)
def test_availability_zones(self):
self.verify_list(self.proxy.availability_zones,
availability_zone.AvailabilityZone)
self.verify_list(
self.proxy.availability_zones, availability_zone.AvailabilityZone
)
def test_availability_zone_get(self):
self.verify_get(self.proxy.get_availability_zone,
availability_zone.AvailabilityZone)
self.verify_get(
self.proxy.get_availability_zone,
availability_zone.AvailabilityZone,
)
def test_availability_zone_create(self):
self.verify_create(self.proxy.create_availability_zone,
availability_zone.AvailabilityZone)
self.verify_create(
self.proxy.create_availability_zone,
availability_zone.AvailabilityZone,
)
def test_availability_zone_delete(self):
self.verify_delete(self.proxy.delete_availability_zone,
availability_zone.AvailabilityZone, True)
self.verify_delete(
self.proxy.delete_availability_zone,
availability_zone.AvailabilityZone,
True,
)
def test_availability_zone_find(self):
self.verify_find(self.proxy.find_availability_zone,
availability_zone.AvailabilityZone)
self.verify_find(
self.proxy.find_availability_zone,
availability_zone.AvailabilityZone,
)
def test_availability_zone_update(self):
self.verify_update(self.proxy.update_availability_zone,
availability_zone.AvailabilityZone)
self.verify_update(
self.proxy.update_availability_zone,
availability_zone.AvailabilityZone,
)