Merge "Add v2 load balancer type and controllers"

This commit is contained in:
Jenkins 2017-03-16 18:33:17 +00:00 committed by Gerrit Code Review
commit 1ce3234dc0
9 changed files with 1288 additions and 59 deletions

View File

@ -89,6 +89,12 @@ class BaseType(wtypes.Base):
:param render_unsets: If True, will convert items that are WSME Unset
types to None. If False, does not add the item
"""
# Set project_id equal tenant_id if project_id is unset and tenant_id
# is
if hasattr(self, 'project_id') and hasattr(self, 'tenant_id'):
if (isinstance(self.project_id, wtypes.UnsetType) and
not isinstance(self.tenant_id, wtypes.UnsetType)):
self.project_id = self.tenant_id
ret_dict = {}
for attr in dir(self):
if attr.startswith('_'):
@ -99,7 +105,6 @@ class BaseType(wtypes.Base):
# wsme.rest.json.fromjson and using the @fromjson.when_object
# decorator.
if attr == 'tenant_id':
ret_dict['project_id'] = value
continue
if value and callable(value):
continue

View File

@ -12,16 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import pecan
from wsme import types as wtypes
from wsmeext import pecan as wsme_pecan
from octavia.api.v1.controllers import load_balancer
from octavia.api.v2.controllers import base
from octavia.api.v2.controllers import load_balancer
class BaseV2Controller(base.BaseController):
loadbalancers = load_balancer.LoadBalancersController()
@pecan.expose()
@wsme_pecan.wsexpose(wtypes.text)
def get(self):
return "v2.0"

View File

@ -0,0 +1,173 @@
# Copyright 2014 Rackspace
# Copyright 2016 Blue Box, an IBM Company
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db import exception as odb_exceptions
from oslo_log import log as logging
from oslo_utils import excutils
import pecan
from wsme import types as wtypes
from wsmeext import pecan as wsme_pecan
from octavia.api.v2.controllers import base
from octavia.api.v2.types import load_balancer as lb_types
from octavia.common import constants
from octavia.common import data_models
from octavia.common import exceptions
import octavia.common.validate as validate
from octavia.db import api as db_api
from octavia.db import prepare as db_prepare
from octavia.i18n import _LI
LOG = logging.getLogger(__name__)
class LoadBalancersController(base.BaseController):
def __init__(self):
super(LoadBalancersController, self).__init__()
self.handler = self.handler.load_balancer
@wsme_pecan.wsexpose(lb_types.LoadBalancerRootResponse, wtypes.text)
def get_one(self, id):
"""Gets a single load balancer's details."""
context = pecan.request.context.get('octavia_context')
load_balancer = self._get_db_lb(context.session, id)
result = self._convert_db_to_type(load_balancer,
lb_types.LoadBalancerResponse)
return lb_types.LoadBalancerRootResponse(loadbalancer=result)
@wsme_pecan.wsexpose(lb_types.LoadBalancersRootResponse, wtypes.text,
wtypes.text)
def get_all(self, tenant_id=None, project_id=None):
"""Lists all load balancers."""
# NOTE(blogan): tenant_id and project_id are optional query parameters
# tenant_id and project_id are the same thing. tenant_id will be kept
# around for a long amount of time.
context = pecan.request.context.get('octavia_context')
project_id = context.project_id or project_id or tenant_id
load_balancers = self.repositories.load_balancer.get_all(
context.session, project_id=project_id)
result = self._convert_db_to_type(load_balancers,
[lb_types.LoadBalancerResponse])
return lb_types.LoadBalancersRootResponse(loadbalancers=result)
def _test_lb_status(self, session, id, lb_status=constants.PENDING_UPDATE):
"""Verify load balancer is in a mutable state."""
lb_repo = self.repositories.load_balancer
if not lb_repo.test_and_set_provisioning_status(
session, id, lb_status):
prov_status = lb_repo.get(session, id=id).provisioning_status
LOG.info(_LI(
"Invalid state %(state)s of loadbalancer resource %(id)s"),
{"state": prov_status, "id": id})
raise exceptions.LBPendingStateError(
state=prov_status, id=id)
@wsme_pecan.wsexpose(lb_types.LoadBalancerRootResponse,
body=lb_types.LoadBalancerRootPOST, status_code=202)
def post(self, load_balancer):
"""Creates a load balancer."""
load_balancer = load_balancer.loadbalancer
context = pecan.request.context.get('octavia_context')
# Validate the subnet id
if load_balancer.vip_subnet_id:
if not validate.subnet_exists(load_balancer.vip_subnet_id):
raise exceptions.NotFound(resource='Subnet',
id=load_balancer.vip_subnet_id)
lock_session = db_api.get_session(autocommit=False)
if self.repositories.check_quota_met(
context.session,
lock_session,
data_models.LoadBalancer,
load_balancer.project_id):
lock_session.rollback()
raise exceptions.QuotaException
# TODO(blogan): lb graph, look at v1 code
try:
lb_dict = db_prepare.create_load_balancer(load_balancer.to_dict(
render_unsets=True
))
vip_dict = lb_dict.pop('vip', {})
db_lb = self.repositories.create_load_balancer_and_vip(
lock_session, lb_dict, vip_dict)
lock_session.commit()
except odb_exceptions.DBDuplicateEntry:
lock_session.rollback()
raise exceptions.IDAlreadyExists()
except Exception:
with excutils.save_and_reraise_exception():
lock_session.rollback()
# Handler will be responsible for sending to controller
try:
LOG.info(_LI("Sending created Load Balancer %s to the handler"),
db_lb.id)
self.handler.create(db_lb)
except Exception:
with excutils.save_and_reraise_exception(reraise=False):
self.repositories.load_balancer.update(
context.session, db_lb.id,
provisioning_status=constants.ERROR)
result = self._convert_db_to_type(db_lb, lb_types.LoadBalancerResponse)
return lb_types.LoadBalancerRootResponse(loadbalancer=result)
@wsme_pecan.wsexpose(lb_types.LoadBalancerRootResponse,
wtypes.text, status_code=200,
body=lb_types.LoadBalancerRootPUT)
def put(self, id, load_balancer):
"""Updates a load balancer."""
load_balancer = load_balancer.loadbalancer
context = pecan.request.context.get('octavia_context')
db_lb = self._get_db_lb(context.session, id)
self._test_lb_status(context.session, id)
try:
LOG.info(_LI("Sending updated Load Balancer %s to the handler"),
id)
self.handler.update(db_lb, load_balancer)
except Exception:
with excutils.save_and_reraise_exception(reraise=False):
self.repositories.load_balancer.update(
context.session, id, provisioning_status=constants.ERROR)
db_lb = self._get_db_lb(context.session, id)
result = self._convert_db_to_type(db_lb, lb_types.LoadBalancerResponse)
return lb_types.LoadBalancerRootResponse(loadbalancer=result)
def _delete(self, id, cascade=False):
"""Deletes a load balancer."""
context = pecan.request.context.get('octavia_context')
db_lb = self._get_db_lb(context.session, id)
self._test_lb_status(context.session, id,
lb_status=constants.PENDING_DELETE)
try:
LOG.info(_LI("Sending deleted Load Balancer %s to the handler"),
db_lb.id)
self.handler.delete(db_lb, cascade)
except Exception:
with excutils.save_and_reraise_exception(reraise=False):
self.repositories.load_balancer.update(
context.session, db_lb.id,
provisioning_status=constants.ERROR)
result = self._convert_db_to_type(db_lb, lb_types.LoadBalancerResponse)
return lb_types.LoadBalancersRootResponse(loadbalancer=result)
@wsme_pecan.wsexpose(None, wtypes.text, status_code=204)
def delete(self, id):
"""Deletes a load balancer."""
return self._delete(id)

View File

@ -0,0 +1,94 @@
# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from wsme import types as wtypes
from octavia.api.common import types
class BaseLoadBalancerType(types.BaseType):
_type_to_model_map = {'vip_address': 'vip.ip_address',
'vip_subnet_id': 'vip.subnet_id',
'vip_port_id': 'vip.port_id',
'vip_network_id': 'vip.network_id',
'admin_state_up': 'enabled'}
class LoadBalancerResponse(BaseLoadBalancerType):
"""Defines which attributes are to be shown on any response."""
id = wtypes.wsattr(wtypes.UuidType())
name = wtypes.wsattr(wtypes.StringType())
description = wtypes.wsattr(wtypes.StringType())
provisioning_status = wtypes.wsattr(wtypes.StringType())
operating_status = wtypes.wsattr(wtypes.StringType())
admin_state_up = wtypes.wsattr(bool)
project_id = wtypes.wsattr(wtypes.StringType())
tenant_id = wtypes.wsattr(wtypes.StringType())
created_at = wtypes.wsattr(wtypes.datetime.datetime)
updated_at = wtypes.wsattr(wtypes.datetime.datetime)
vip_address = wtypes.wsattr(types.IPAddressType())
vip_port_id = wtypes.wsattr(wtypes.UuidType())
vip_subnet_id = wtypes.wsattr(wtypes.UuidType())
# TODO(blogan): add listeners once that has been merged
# TODO(ankur-gupta-f): add pools once that has been merged
@classmethod
def from_data_model(cls, data_model, children=False):
result = super(BaseLoadBalancerType, cls).from_data_model(
data_model, children=children)
if data_model.vip:
result.vip_subnet_id = data_model.vip.subnet_id
result.vip_port_id = data_model.vip.port_id
result.vip_address = data_model.vip.ip_address
result.vip_network_id = data_model.vip.network_id
result.tenant_id = data_model.project_id
return result
class LoadBalancerRootResponse(types.BaseType):
loadbalancer = wtypes.wsattr(LoadBalancerResponse)
class LoadBalancersRootResponse(types.BaseType):
loadbalancers = wtypes.wsattr([LoadBalancerResponse])
class LoadBalancerPOST(BaseLoadBalancerType):
"""Defines mandatory and optional attributes of a POST request."""
name = wtypes.wsattr(wtypes.StringType(max_length=255))
description = wtypes.wsattr(wtypes.StringType(max_length=255))
admin_state_up = wtypes.wsattr(bool, default=True)
vip_address = wtypes.wsattr(types.IPAddressType())
vip_port_id = wtypes.wsattr(wtypes.UuidType())
vip_subnet_id = wtypes.wsattr(wtypes.UuidType())
vip_network_id = wtypes.wsattr(wtypes.UuidType())
project_id = wtypes.wsattr(wtypes.StringType(max_length=36))
tenant_id = wtypes.wsattr(wtypes.StringType(max_length=36))
class LoadBalancerRootPOST(types.BaseType):
loadbalancer = wtypes.wsattr(LoadBalancerPOST)
class LoadBalancerPUT(BaseLoadBalancerType):
"""Defines attributes that are acceptable of a PUT request."""
name = wtypes.wsattr(wtypes.StringType(max_length=255))
description = wtypes.wsattr(wtypes.StringType(max_length=255))
admin_state_up = wtypes.wsattr(bool)
class LoadBalancerRootPUT(types.BaseType):
loadbalancer = wtypes.wsattr(LoadBalancerPUT)

View File

@ -148,6 +148,11 @@ class ImmutableObject(APIException):
code = 409
class LBPendingStateError(APIException):
msg = _("Invalid state %(state)s of loadbalancer resource %(id)s")
code = 409
class TooManyL7RulesOnL7Policy(APIException):
msg = _("Too many rules on L7 policy %(id)s")
code = 409

View File

@ -75,6 +75,7 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
'handler.SimulatedControllerHandler')
self.handler_mock = patcher.start()
self.app = self._make_app()
self.project_id = uuidutils.generate_uuid()
def reset_pecan():
patcher.stop()
@ -89,7 +90,7 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
def _get_full_path(self, path):
return ''.join([self.BASE_PATH, path])
def delete(self, path, headers=None, status=202, expect_errors=False):
def delete(self, path, headers=None, status=204, expect_errors=False):
headers = headers or {}
full_path = self._get_full_path(path)
response = self.app.delete(full_path,
@ -108,7 +109,7 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
expect_errors=expect_errors)
return response
def put(self, path, body, headers=None, status=202, expect_errors=False):
def put(self, path, body, headers=None, status=200, expect_errors=False):
headers = headers or {}
full_path = self._get_full_path(path)
response = self.app.put_json(full_path,
@ -128,17 +129,22 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
expect_errors=expect_errors)
return response
def create_load_balancer(self, vip, **optionals):
req_dict = {'vip': vip}
def create_load_balancer(self, vip_subnet_id,
**optionals):
req_dict = {'vip_subnet_id': vip_subnet_id}
req_dict.update(optionals)
response = self.post(self.LBS_PATH, req_dict)
body = {'loadbalancer': req_dict}
response = self.post(self.LBS_PATH, body)
return response.json
def create_listener(self, lb_id, protocol, protocol_port, **optionals):
req_dict = {'protocol': protocol, 'protocol_port': protocol_port}
def create_listener(self, protocol, protocol_port, lb_id,
**optionals):
req_dict = {'protocol': protocol, 'protocol_port': protocol_port,
'load_balancer_id': lb_id}
req_dict.update(optionals)
path = self.LISTENERS_PATH.format(lb_id=lb_id)
response = self.post(path, req_dict)
path = self.LISTENERS_PATH
body = {'listener': req_dict}
response = self.post(path, body)
return response.json
def create_listener_stats(self, listener_id, amphora_id):
@ -160,59 +166,66 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
**opts)
return amphora
def get_listener(self, lb_id, listener_id):
path = self.LISTENER_PATH.format(lb_id=lb_id, listener_id=listener_id)
def get_listener(self, listener_id):
path = self.LISTENER_PATH.format(listener_id=listener_id)
response = self.get(path)
return response.json
def create_pool_sans_listener(self, lb_id, protocol, lb_algorithm,
**optionals):
req_dict = {'protocol': protocol, 'lb_algorithm': lb_algorithm}
def create_pool_with_listener(self, lb_id, listener_id, protocol,
lb_algorithm, **optionals):
req_dict = {'load_balancer_id': lb_id, 'listener_id': listener_id,
'protocol': protocol, 'lb_algorithm': lb_algorithm}
req_dict.update(optionals)
path = self.POOLS_PATH.format(lb_id=lb_id)
response = self.post(path, req_dict)
body = {'pool': req_dict}
path = self.POOLS_PATH
response = self.post(path, body)
return response.json
def create_pool(self, lb_id, listener_id, protocol, lb_algorithm,
**optionals):
req_dict = {'protocol': protocol, 'lb_algorithm': lb_algorithm}
def create_pool(self, lb_id, protocol, lb_algorithm, **optionals):
req_dict = {'load_balancer_id': lb_id, 'protocol': protocol,
'lb_algorithm': lb_algorithm}
req_dict.update(optionals)
path = self.DEPRECATED_POOLS_PATH.format(lb_id=lb_id,
listener_id=listener_id)
response = self.post(path, req_dict)
body = {'pool': req_dict}
path = self.POOLS_PATH
response = self.post(path, body)
return response.json
def create_member(self, lb_id, pool_id, ip_address,
protocol_port, expect_error=False, **optionals):
def create_member(self, pool_id, ip_address, protocol_port,
expect_error=False, **optionals):
req_dict = {'ip_address': ip_address, 'protocol_port': protocol_port}
req_dict.update(optionals)
path = self.MEMBERS_PATH.format(lb_id=lb_id, pool_id=pool_id)
response = self.post(path, req_dict, expect_errors=expect_error)
body = {'member': req_dict}
path = self.MEMBERS_PATH.format(pool_id=pool_id)
response = self.post(path, body, expect_errors=expect_error)
return response.json
def create_member_with_listener(self, lb_id, listener_id, pool_id,
ip_address, protocol_port, **optionals):
req_dict = {'ip_address': ip_address, 'protocol_port': protocol_port}
def create_member_with_listener(self, pool_id, listener_id, ip_address,
protocol_port, **optionals):
req_dict = {'listener_id': listener_id, 'ip_address': ip_address,
'protocol_port': protocol_port}
req_dict.update(optionals)
path = self.DEPRECATED_MEMBERS_PATH.format(
lb_id=lb_id, listener_id=listener_id, pool_id=pool_id)
response = self.post(path, req_dict)
body = {'member': req_dict}
path = self.MEMBERS_PATH.format(pool_id=pool_id)
response = self.post(path, body)
return response.json
def create_health_monitor(self, lb_id, pool_id, type,
delay, timeout, fall_threshold, rise_threshold,
# TODO(sindhu): Will be modified later in the Health_Monitor review
def create_health_monitor(self, lb_id, type, delay, timeout,
fall_threshold, rise_threshold, root_tag=None,
**optionals):
req_dict = {'type': type,
req_dict = {'load_balancer_id': lb_id, 'type': type,
'delay': delay,
'timeout': timeout,
'fall_threshold': fall_threshold,
'rise_threshold': rise_threshold}
req_dict.update(optionals)
path = self.HM_PATH.format(lb_id=lb_id,
pool_id=pool_id)
response = self.post(path, req_dict)
body = {root_tag: req_dict}
path = self.HMS_PATH
response = self.post(path, body)
return response.json
# TODO(sindhu): Will be modified according to the
# health_monitor_test_cases later
def create_health_monitor_with_listener(
self, lb_id, listener_id, pool_id, type,
delay, timeout, fall_threshold, rise_threshold, **optionals):
@ -227,21 +240,21 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
response = self.post(path, req_dict)
return response.json
def create_l7policy(self, lb_id, listener_id, action, **optionals):
req_dict = {'action': action}
def create_l7policy(self, listener_id, action, **optionals):
req_dict = {'listener_id': listener_id, 'action': action}
req_dict.update(optionals)
path = self.L7POLICIES_PATH.format(lb_id=lb_id,
listener_id=listener_id)
response = self.post(path, req_dict)
body = {'l7policy': req_dict}
path = self.L7POLICIES_PATH
response = self.post(path, body)
return response.json
def create_l7rule(self, lb_id, listener_id, l7policy_id, type,
compare_type, value, **optionals):
def create_l7rule(self, l7policy_id, type, compare_type,
value, **optionals):
req_dict = {'type': type, 'compare_type': compare_type, 'value': value}
req_dict.update(optionals)
path = self.L7RULES_PATH.format(lb_id=lb_id, listener_id=listener_id,
l7policy_id=l7policy_id)
response = self.post(path, req_dict)
body = {'l7rule': req_dict}
path = self.L7RULES_PATH.format(l7policy_id=l7policy_id)
response = self.post(path, body)
return response.json
def _set_lb_and_children_statuses(self, lb_id, prov_status, op_status):
@ -289,22 +302,23 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
expected_prov_status = constants.DELETED
expected_op_status = constants.OFFLINE
self.set_lb_status(lb_id, status=expected_prov_status)
self.assert_correct_listener_status(lb_id, listener_id,
expected_prov_status,
expected_op_status)
self.assert_correct_listener_status(expected_prov_status,
expected_op_status,
listener_id)
def assert_correct_lb_status(self, lb_id, provisioning_status,
operating_status):
api_lb = self.get(self.LB_PATH.format(lb_id=lb_id)).json
api_lb = self.get(
self.LB_PATH.format(lb_id=lb_id)).json.get('loadbalancer')
self.assertEqual(provisioning_status,
api_lb.get('provisioning_status'))
self.assertEqual(operating_status,
api_lb.get('operating_status'))
def assert_correct_listener_status(self, lb_id, listener_id,
provisioning_status, operating_status):
def assert_correct_listener_status(self, provisioning_status,
operating_status, listener_id):
api_listener = self.get(self.LISTENER_PATH.format(
lb_id=lb_id, listener_id=listener_id)).json
listener_id=listener_id)).json.get('listener')
self.assertEqual(provisioning_status,
api_listener.get('provisioning_status'))
self.assertEqual(operating_status,

View File

@ -0,0 +1,832 @@
# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from oslo_utils import uuidutils
from octavia.common import constants
from octavia.network import base as network_base
from octavia.tests.functional.api.v2 import base
import testtools
class TestLoadBalancer(base.BaseAPITest):
def _build_body(self, json):
return {'loadbalancer': json}
def test_empty_list(self):
response = self.get(self.LBS_PATH)
api_list = response.json
self.assertEqual({'loadbalancers': []}, api_list)
def test_create(self, **optionals):
lb_json = {'name': 'test1',
'vip_subnet_id': uuidutils.generate_uuid(),
'project_id': self.project_id
}
lb_json.update(optionals)
body = self._build_body(lb_json)
response = self.post(self.LBS_PATH, body)
api_lb = response.json.get('loadbalancer')
self.assertTrue(uuidutils.is_uuid_like(api_lb.get('id')))
self.assertEqual(lb_json.get('name'), api_lb.get('name'))
self.assertEqual(constants.PENDING_CREATE,
api_lb.get('provisioning_status'))
self.assertEqual(constants.OFFLINE,
api_lb.get('operating_status'))
self.assertTrue(api_lb.get('admin_state_up'))
self.assertIsNotNone(api_lb.get('created_at'))
self.assertIsNone(api_lb.get('updated_at'))
for key, value in optionals.items():
self.assertEqual(value, lb_json.get(key))
self.assert_final_lb_statuses(api_lb.get('id'))
def test_create_with_duplicate_id(self):
lb = self.create_load_balancer(
uuidutils.generate_uuid()).get('loadbalancer')
body = self._build_body({'id': lb.get('id'),
'vip_subnet_id': uuidutils.generate_uuid()})
self.post(self.LBS_PATH, body,
status=409, expect_errors=True)
def test_create_without_vip(self):
lb_json = {'name': 'test1'}
self.post(self.LB_PATH, lb_json, status=400)
def test_create_with_project_id(self):
self.test_create(project_id=uuidutils.generate_uuid())
def test_get_all(self):
root_tag = 'loadbalancer'
lb1 = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1')
lb2 = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb2')
lb3 = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb3')
response = self.get(self.LBS_PATH,
params={'project_id': self.project_id})
lbs = response.json.get(root_tag)
# Same get all project_id issue
if lbs is not None:
lb_id_names = [(lb.get('id'), lb.get('name')) for lb in lbs]
self.assertEqual(3, len(lbs))
self.assertIn((lb1.get('id'), lb1.get('name')), lb_id_names)
self.assertIn((lb2.get('id'), lb2.get('name')), lb_id_names)
self.assertIn((lb3.get('id'), lb3.get('name')), lb_id_names)
def test_get_all_by_project_id(self):
project1_id = uuidutils.generate_uuid()
project2_id = uuidutils.generate_uuid()
tag = 'loadbalancer'
lb1 = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
project_id=project1_id)
lb2 = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb2',
project_id=project1_id)
lb3 = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb3',
project_id=project2_id)
project1_path = "{0}?project_id={1}".format(self.LBS_PATH, project1_id)
response = self.get(project1_path)
lbs = response.json.get('loadbalancers')
self.assertEqual(2, len(lbs))
lb_id_names = [(lb.get('id'), lb.get('name')) for lb in lbs]
self.assertEqual(2, len(lbs))
lb1 = lb1.get(tag)
lb2 = lb2.get(tag)
lb3 = lb3.get(tag)
self.assertIn((lb1.get('id'), lb1.get('name')), lb_id_names)
self.assertIn((lb2.get('id'), lb2.get('name')), lb_id_names)
project2_path = "{0}?project_id={1}".format(self.LBS_PATH,
project2_id)
response = self.get(project2_path)
lbs = response.json.get('loadbalancers')
lb_id_names = [(lb.get('id'), lb.get('name')) for lb in lbs]
self.assertEqual(1, len(lbs))
self.assertIn((lb3.get('id'), lb3.get('name')), lb_id_names)
def test_get(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
response = self.get(
self.LB_PATH.format(
lb_id=lb_dict.get('id'))).json.get('loadbalancer')
self.assertEqual('lb1', response.get('name'))
self.assertEqual('desc1', response.get('description'))
self.assertFalse(response.get('admin_state_up'))
def test_get_bad_lb_id(self):
path = self.LB_PATH.format(lb_id='SEAN-CONNERY')
self.get(path, status=404)
def test_create_with_vip(self):
lb_json = {'vip_address': '10.0.0.1',
'vip_subnet_id': uuidutils.generate_uuid(),
'vip_port_id': uuidutils.generate_uuid(),
'name': 'test1', 'description': 'test1_desc',
'admin_state_up': False,
'project_id': self.project_id}
body = self._build_body(lb_json)
response = self.post(self.LBS_PATH, body)
api_lb = response.json.get('loadbalancer')
self.assertTrue(uuidutils.is_uuid_like(api_lb.get('id')))
self.assertEqual(lb_json.get('name'), api_lb.get('name'))
self.assertEqual(lb_json.get('description'), api_lb.get('description'))
self.assertEqual(constants.PENDING_CREATE,
api_lb['provisioning_status'])
self.assertEqual(constants.OFFLINE,
api_lb['operating_status'])
self.assertEqual(lb_json.get('admin_state_up'),
api_lb.get('admin_state_up'))
self.assertEqual(lb_json.get('vip_address'),
api_lb.get('vip_address'))
self.assertEqual(lb_json.get('vip_subnet_id'),
api_lb.get('vip_subnet_id'))
self.assertEqual(lb_json.get('vip_port_id'),
api_lb.get('vip_port_id'))
self.assert_final_lb_statuses(api_lb.get('id'))
def test_create_with_long_name(self):
lb_json = {'name': 'n' * 256, 'vip': {}}
self.post(self.LBS_PATH, lb_json, status=400)
def test_create_with_long_description(self):
lb_json = {'description': 'n' * 256, 'vip': {}}
self.post(self.LBS_PATH, lb_json, status=400)
def test_create_with_nonuuid_vip_attributes(self):
lb_json = {'vip': {'subnet_id': 'HI'}}
self.post(self.LBS_PATH, lb_json, status=400)
def test_update(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
lb_json = self._build_body({'name': 'lb2'})
lb = self.set_lb_status(lb_dict.get('id'))
response = self.put(self.LB_PATH.format(lb_id=lb_dict.get('id')),
lb_json)
api_lb = response.json.get('loadbalancer')
self.assertIsNotNone(api_lb.get('vip_subnet_id'))
self.assertEqual('lb1', api_lb.get('name'))
self.assertEqual('desc1', api_lb.get('description'))
self.assertFalse(api_lb.get('admin_state_up'))
self.assertEqual(lb.get('operational_status'),
api_lb.get('operational_status'))
self.assertIsNotNone(api_lb.get('created_at'))
self.assertIsNotNone(api_lb.get('updated_at'))
self.assert_final_lb_statuses(api_lb.get('id'))
def test_update_with_vip(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
lb_json = self._build_body({'vip_subnet_id': '1234'})
lb = self.set_lb_status(lb_dict.get('id'))
self.put(self.LB_PATH.format(lb_id=lb_dict.get('id')),
lb_json, status=400)
def test_update_bad_lb_id(self):
path = self.LB_PATH.format(lb_id='SEAN-CONNERY')
self.put(path, body={}, status=404)
def test_update_pending_create(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
lb_json = self._build_body({'name': 'Roberto'})
self.put(self.LB_PATH.format(lb_id=lb_dict.get('id')),
lb_json, status=409)
def test_delete_pending_create(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
self.delete(self.LB_PATH.format(lb_id=lb_dict.get('id')), status=409)
def test_update_pending_update(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
lb_json = self._build_body({'name': 'Bob'})
lb = self.set_lb_status(lb_dict.get('id'))
self.put(self.LB_PATH.format(lb_id=lb_dict.get('id')), lb_json)
self.put(self.LB_PATH.format(lb_id=lb_dict.get('id')),
lb_json, status=409)
def test_delete_pending_update(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_json = self._build_body({'name': 'Steve'})
lb_dict = lb.get('loadbalancer')
lb = self.set_lb_status(lb_dict.get('id'))
self.put(self.LB_PATH.format(lb_id=lb_dict.get('id')), lb_json,
status=200)
self.delete(self.LB_PATH.format(lb_id=lb_dict.get('id')), status=409)
def test_delete_with_error_status(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
lb = self.set_lb_status(lb_dict.get('id'), status=constants.ERROR)
self.delete(self.LB_PATH.format(lb_id=lb_dict.get('id')), status=204)
def test_update_pending_delete(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
lb = self.set_lb_status(lb_dict.get('id'))
self.delete(self.LB_PATH.format(lb_id=lb_dict.get('id')))
lb_json = self._build_body({'name': 'John'})
self.put(self.LB_PATH.format(lb_id=lb_dict.get('id')),
lb_json, status=409)
def test_delete_pending_delete(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
lb = self.set_lb_status(lb_dict.get('id'))
self.delete(self.LB_PATH.format(lb_id=lb_dict.get('id')))
self.delete(self.LB_PATH.format(lb_id=lb_dict.get('id')), status=409)
def test_delete(self):
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
description='desc1',
admin_state_up=False)
lb_dict = lb.get('loadbalancer')
lb = self.set_lb_status(lb_dict.get('id'))
self.delete(self.LB_PATH.format(lb_id=lb_dict.get('id')))
response = self.get(self.LB_PATH.format(lb_id=lb_dict.get('id')))
api_lb = response.json.get('loadbalancer')
self.assertEqual('lb1', api_lb.get('name'))
self.assertEqual('desc1', api_lb.get('description'))
self.assertFalse(api_lb.get('admin_state_up'))
self.assertEqual(lb.get('operational_status'),
api_lb.get('operational_status'))
self.assert_final_lb_statuses(api_lb.get('id'), delete=True)
def test_delete_bad_lb_id(self):
path = self.LB_PATH.format(lb_id='bad_uuid')
self.delete(path, status=404)
def test_create_with_bad_subnet(self, **optionals):
with mock.patch(
'octavia.common.utils.get_network_driver') as net_mock:
net_mock.return_value.get_subnet = mock.Mock(
side_effect=network_base.SubnetNotFound('Subnet not found'))
subnet_id = uuidutils.generate_uuid()
lb_json = {'name': 'test1',
'vip_subnet_id': subnet_id,
'vip_address': '10.0.0.1',
'project_id': self.project_id}
lb_json.update(optionals)
body = self._build_body(lb_json)
response = self.post(self.LBS_PATH, body, expect_errors=True)
err_msg = 'Subnet ' + subnet_id + ' not found.'
self.assertEqual(response.json.get('faultstring'), err_msg)
def test_create_with_valid_subnet(self, **optionals):
subnet_id = uuidutils.generate_uuid()
with mock.patch(
'octavia.common.utils.get_network_driver') as net_mock:
net_mock.return_value.get_subnet.return_value = subnet_id
lb_json = {'name': 'test1',
'vip_subnet_id': subnet_id,
'vip_address': '10.0.0.1',
'project_id': self.project_id}
lb_json.update(optionals)
body = self._build_body(lb_json)
response = self.post(self.LBS_PATH, body)
api_lb = response.json.get('loadbalancer')
self.assertEqual(lb_json.get('vip_subnet_id'),
api_lb.get('vip_subnet_id'))
class TestLoadBalancerGraph(base.BaseAPITest):
def setUp(self):
super(TestLoadBalancerGraph, self).setUp()
self._project_id = uuidutils.generate_uuid()
def _assert_graphs_equal(self, expected_graph, observed_graph):
observed_graph_copy = copy.deepcopy(observed_graph)
del observed_graph_copy['created_at']
del observed_graph_copy['updated_at']
obs_lb_id = observed_graph_copy.pop('id')
self.assertTrue(uuidutils.is_uuid_like(obs_lb_id))
expected_listeners = expected_graph.pop('listeners', [])
observed_listeners = observed_graph_copy.pop('listeners', [])
self.assertEqual(expected_graph, observed_graph_copy)
for observed_listener in observed_listeners:
del observed_listener['created_at']
del observed_listener['updated_at']
self.assertTrue(uuidutils.is_uuid_like(
observed_listener.pop('id')))
default_pool = observed_listener.get('default_pool')
if default_pool:
observed_listener.pop('default_pool_id')
self.assertTrue(default_pool.get('id'))
default_pool.pop('id')
default_pool.pop('created_at')
default_pool.pop('updated_at')
hm = default_pool.get('healthmonitor')
if hm:
self.assertTrue(hm.get('id'))
hm.pop('id')
for member in default_pool.get('members', []):
self.assertTrue(member.get('id'))
member.pop('id')
member.pop('created_at')
member.pop('updated_at')
if observed_listener.get('sni_containers'):
observed_listener['sni_containers'].sort()
o_l7policies = observed_listener.get('l7policies')
if o_l7policies:
for o_l7policy in o_l7policies:
if o_l7policy.get('redirect_pool'):
r_pool = o_l7policy.get('redirect_pool')
self.assertTrue(r_pool.get('id'))
r_pool.pop('id')
r_pool.pop('created_at')
r_pool.pop('updated_at')
self.assertTrue(o_l7policy.get('redirect_pool_id'))
o_l7policy.pop('redirect_pool_id')
if r_pool.get('members'):
for r_member in r_pool.get('members'):
self.assertTrue(r_member.get('id'))
r_member.pop('id')
r_member.pop('created_at')
r_member.pop('updated_at')
self.assertTrue(o_l7policy.get('id'))
o_l7policy.pop('id')
l7rules = o_l7policy.get('l7rules')
for l7rule in l7rules:
self.assertTrue(l7rule.get('id'))
l7rule.pop('id')
self.assertIn(observed_listener, expected_listeners)
def _get_lb_bodies(self, create_listeners, expected_listeners):
create_lb = {
'name': 'lb1',
'project_id': self._project_id,
'vip': {},
'listeners': create_listeners
}
expected_lb = {
'description': None,
'enabled': True,
'provisioning_status': constants.PENDING_CREATE,
'operating_status': constants.OFFLINE
}
expected_lb.update(create_lb)
expected_lb['listeners'] = expected_listeners
expected_lb['vip'] = {'ip_address': None, 'port_id': None,
'subnet_id': None}
return create_lb, expected_lb
def _get_listener_bodies(self, name='listener1', protocol_port=80,
create_default_pool=None,
expected_default_pool=None,
create_l7policies=None,
expected_l7policies=None,
create_sni_containers=None,
expected_sni_containers=None):
create_listener = {
'name': name,
'protocol_port': protocol_port,
'protocol': constants.PROTOCOL_HTTP,
'project_id': self._project_id
}
expected_listener = {
'description': None,
'tls_certificate_id': None,
'sni_containers': [],
'connection_limit': None,
'enabled': True,
'provisioning_status': constants.PENDING_CREATE,
'operating_status': constants.OFFLINE,
'insert_headers': {}
}
if create_sni_containers:
create_listener['sni_containers'] = create_sni_containers
expected_listener.update(create_listener)
if create_default_pool:
pool = create_default_pool
create_listener['default_pool'] = pool
if pool.get('id'):
create_listener['default_pool_id'] = pool['id']
if create_l7policies:
l7policies = create_l7policies
create_listener['l7policies'] = l7policies
if expected_default_pool:
expected_listener['default_pool'] = expected_default_pool
if expected_sni_containers:
expected_listener['sni_containers'] = expected_sni_containers
if expected_l7policies:
expected_listener['l7policies'] = expected_l7policies
return create_listener, expected_listener
def _get_pool_bodies(self, name='pool1', create_members=None,
expected_members=None, create_hm=None,
expected_hm=None, protocol=constants.PROTOCOL_HTTP,
session_persistence=True):
create_pool = {
'name': name,
'protocol': protocol,
'lb_algorithm': constants.LB_ALGORITHM_ROUND_ROBIN,
'project_id': self._project_id
}
if session_persistence:
create_pool['session_persistence'] = {
'type': constants.SESSION_PERSISTENCE_SOURCE_IP,
'cookie_name': None}
if create_members:
create_pool['members'] = create_members
if create_hm:
create_pool['health_monitor'] = create_hm
expected_pool = {
'description': None,
'session_persistence': None,
'members': [],
'enabled': True,
'operating_status': constants.OFFLINE
}
expected_pool.update(create_pool)
if expected_members:
expected_pool['members'] = expected_members
if expected_hm:
expected_pool['health_monitor'] = expected_hm
return create_pool, expected_pool
def _get_member_bodies(self, protocol_port=80):
create_member = {
'ip_address': '10.0.0.1',
'protocol_port': protocol_port,
'project_id': self._project_id
}
expected_member = {
'weight': 1,
'enabled': True,
'subnet_id': None,
'operating_status': constants.OFFLINE
}
expected_member.update(create_member)
return create_member, expected_member
def _get_hm_bodies(self):
create_hm = {
'type': constants.HEALTH_MONITOR_PING,
'delay': 1,
'timeout': 1,
'fall_threshold': 1,
'rise_threshold': 1,
'project_id': self._project_id
}
expected_hm = {
'http_method': 'GET',
'url_path': '/',
'expected_codes': '200',
'enabled': True
}
expected_hm.update(create_hm)
return create_hm, expected_hm
def _get_sni_container_bodies(self):
create_sni_container1 = uuidutils.generate_uuid()
create_sni_container2 = uuidutils.generate_uuid()
create_sni_containers = [create_sni_container1, create_sni_container2]
expected_sni_containers = [create_sni_container1,
create_sni_container2]
expected_sni_containers.sort()
return create_sni_containers, expected_sni_containers
def _get_l7policies_bodies(self, create_pool=None, expected_pool=None,
create_l7rules=None, expected_l7rules=None):
create_l7policies = []
if create_pool:
create_l7policy = {
'action': constants.L7POLICY_ACTION_REDIRECT_TO_POOL,
'redirect_pool': create_pool,
'position': 1,
'enabled': False
}
else:
create_l7policy = {
'action': constants.L7POLICY_ACTION_REDIRECT_TO_URL,
'redirect_url': 'http://127.0.0.1/',
'position': 1,
'enabled': False
}
create_l7policies.append(create_l7policy)
expected_l7policy = {
'name': None,
'description': None,
'redirect_url': None,
'l7rules': []
}
expected_l7policy.update(create_l7policy)
expected_l7policies = []
if expected_pool:
if create_pool.get('id'):
expected_l7policy['redirect_pool_id'] = create_pool.get('id')
expected_l7policy['redirect_pool'] = expected_pool
expected_l7policies.append(expected_l7policy)
if expected_l7rules:
expected_l7policies[0]['l7rules'] = expected_l7rules
if create_l7rules:
create_l7policies[0]['l7rules'] = create_l7rules
return create_l7policies, expected_l7policies
def _get_l7rules_bodies(self, value="localhost"):
create_l7rules = [{
'type': constants.L7RULE_TYPE_HOST_NAME,
'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'value': value,
'invert': False
}]
expected_l7rules = [{
'key': None
}]
expected_l7rules[0].update(create_l7rules[0])
return create_l7rules, expected_l7rules
@testtools.skip('Skip until complete v2 merge')
def test_with_one_listener(self):
create_listener, expected_listener = self._get_listener_bodies()
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_many_listeners(self):
create_listener1, expected_listener1 = self._get_listener_bodies()
create_listener2, expected_listener2 = self._get_listener_bodies(
name='listener2', protocol_port=81
)
create_lb, expected_lb = self._get_lb_bodies(
[create_listener1, create_listener2],
[expected_listener1, expected_listener2])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_one_listener_one_pool(self):
create_pool, expected_pool = self._get_pool_bodies()
create_listener, expected_listener = self._get_listener_bodies(
create_default_pool=create_pool,
expected_default_pool=expected_pool
)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_many_listeners_one_pool(self):
create_pool1, expected_pool1 = self._get_pool_bodies()
create_pool2, expected_pool2 = self._get_pool_bodies(name='pool2')
create_listener1, expected_listener1 = self._get_listener_bodies(
create_default_pool=create_pool1,
expected_default_pool=expected_pool1
)
create_listener2, expected_listener2 = self._get_listener_bodies(
create_default_pool=create_pool2,
expected_default_pool=expected_pool2,
name='listener2', protocol_port=81
)
create_lb, expected_lb = self._get_lb_bodies(
[create_listener1, create_listener2],
[expected_listener1, expected_listener2])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_one_listener_one_member(self):
create_member, expected_member = self._get_member_bodies()
create_pool, expected_pool = self._get_pool_bodies(
create_members=[create_member],
expected_members=[expected_member])
create_listener, expected_listener = self._get_listener_bodies(
create_default_pool=create_pool,
expected_default_pool=expected_pool)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_one_listener_one_hm(self):
create_hm, expected_hm = self._get_hm_bodies()
create_pool, expected_pool = self._get_pool_bodies(
create_hm=create_hm,
expected_hm=expected_hm)
create_listener, expected_listener = self._get_listener_bodies(
create_default_pool=create_pool,
expected_default_pool=expected_pool)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_one_listener_sni_containers(self):
create_sni_containers, expected_sni_containers = (
self._get_sni_container_bodies())
create_listener, expected_listener = self._get_listener_bodies(
create_sni_containers=create_sni_containers,
expected_sni_containers=expected_sni_containers)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_l7policy_redirect_pool_no_rule(self):
create_pool, expected_pool = self._get_pool_bodies(create_members=[],
expected_members=[])
create_l7policies, expected_l7policies = self._get_l7policies_bodies(
create_pool=create_pool, expected_pool=expected_pool)
create_listener, expected_listener = self._get_listener_bodies(
create_l7policies=create_l7policies,
expected_l7policies=expected_l7policies)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_l7policy_redirect_pool_one_rule(self):
create_pool, expected_pool = self._get_pool_bodies(create_members=[],
expected_members=[])
create_l7rules, expected_l7rules = self._get_l7rules_bodies()
create_l7policies, expected_l7policies = self._get_l7policies_bodies(
create_pool=create_pool, expected_pool=expected_pool,
create_l7rules=create_l7rules, expected_l7rules=expected_l7rules)
create_listener, expected_listener = self._get_listener_bodies(
create_l7policies=create_l7policies,
expected_l7policies=expected_l7policies)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_l7policy_redirect_pool_bad_rule(self):
create_pool, expected_pool = self._get_pool_bodies(create_members=[],
expected_members=[])
create_l7rules, expected_l7rules = self._get_l7rules_bodies(
value="local host")
create_l7policies, expected_l7policies = self._get_l7policies_bodies(
create_pool=create_pool, expected_pool=expected_pool,
create_l7rules=create_l7rules, expected_l7rules=expected_l7rules)
create_listener, expected_listener = self._get_listener_bodies(
create_l7policies=create_l7policies,
expected_l7policies=expected_l7policies)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
self.post(self.LBS_PATH, create_lb, expect_errors=True)
@testtools.skip('Skip until complete v2 merge')
def test_with_l7policies_one_redirect_pool_one_rule(self):
create_pool, expected_pool = self._get_pool_bodies(create_members=[],
expected_members=[])
create_l7rules, expected_l7rules = self._get_l7rules_bodies()
create_l7policies, expected_l7policies = self._get_l7policies_bodies(
create_pool=create_pool, expected_pool=expected_pool,
create_l7rules=create_l7rules, expected_l7rules=expected_l7rules)
c_l7policies_url, e_l7policies_url = self._get_l7policies_bodies()
for policy in c_l7policies_url:
policy['position'] = 2
create_l7policies.append(policy)
for policy in e_l7policies_url:
policy['position'] = 2
expected_l7policies.append(policy)
create_listener, expected_listener = self._get_listener_bodies(
create_l7policies=create_l7policies,
expected_l7policies=expected_l7policies)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_l7policies_redirect_pools_no_rules(self):
create_pool, expected_pool = self._get_pool_bodies()
create_l7policies, expected_l7policies = self._get_l7policies_bodies(
create_pool=create_pool, expected_pool=expected_pool)
r_create_pool, r_expected_pool = self._get_pool_bodies()
c_l7policies_url, e_l7policies_url = self._get_l7policies_bodies(
create_pool=r_create_pool, expected_pool=r_expected_pool)
for policy in c_l7policies_url:
policy['position'] = 2
create_l7policies.append(policy)
for policy in e_l7policies_url:
policy['position'] = 2
expected_l7policies.append(policy)
create_listener, expected_listener = self._get_listener_bodies(
create_l7policies=create_l7policies,
expected_l7policies=expected_l7policies)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_with_one_of_everything(self):
create_member, expected_member = self._get_member_bodies()
create_hm, expected_hm = self._get_hm_bodies()
create_pool, expected_pool = self._get_pool_bodies(
create_members=[create_member],
expected_members=[expected_member],
create_hm=create_hm,
expected_hm=expected_hm,
protocol=constants.PROTOCOL_TCP)
create_sni_containers, expected_sni_containers = (
self._get_sni_container_bodies())
create_l7rules, expected_l7rules = self._get_l7rules_bodies()
r_create_member, r_expected_member = self._get_member_bodies(
protocol_port=88)
r_create_pool, r_expected_pool = self._get_pool_bodies(
create_members=[r_create_member],
expected_members=[r_expected_member])
create_l7policies, expected_l7policies = self._get_l7policies_bodies(
create_pool=r_create_pool, expected_pool=r_expected_pool,
create_l7rules=create_l7rules, expected_l7rules=expected_l7rules)
create_listener, expected_listener = self._get_listener_bodies(
create_default_pool=create_pool,
expected_default_pool=expected_pool,
create_l7policies=create_l7policies,
expected_l7policies=expected_l7policies,
create_sni_containers=create_sni_containers,
expected_sni_containers=expected_sni_containers)
create_lb, expected_lb = self._get_lb_bodies([create_listener],
[expected_listener])
response = self.post(self.LBS_PATH, create_lb)
api_lb = response.json
self._assert_graphs_equal(expected_lb, api_lb)
@testtools.skip('Skip until complete v2 merge')
def test_db_create_failure(self):
create_listener, expected_listener = self._get_listener_bodies()
create_lb, _ = self._get_lb_bodies([create_listener],
[expected_listener])
with mock.patch('octavia.db.repositories.Repositories.'
'create_load_balancer_tree') as repo_mock:
repo_mock.side_effect = Exception('I am a DB Error')
response = self.post(self.LBS_PATH, create_lb, expect_errors=True)
self.assertEqual(500, response.status_code)

View File

@ -37,6 +37,7 @@ class TestTypeRenameSubset(types.BaseType):
class TestTypeTenantProject(types.BaseType):
tenant_id = wtypes.wsattr(wtypes.StringType())
project_id = wtypes.wsattr(wtypes.StringType())
class ChildTestModel(data_models.BaseDataModel):
@ -153,3 +154,8 @@ class TestDataModelToDict(base.TestCase):
def test_to_dict_recurse(self):
self.assertEqual(self.model.to_dict(recurse=True),
self.RECURSED_RESULT)
def test_type_to_dict_with_project_id(self):
type_dict = TestTypeTenantProject(project_id='1234').to_dict()
self.assertEqual('1234', type_dict['project_id'])
self.assertNotIn('tenant_id', type_dict)

View File

@ -0,0 +1,99 @@
# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from wsme import exc
from wsme.rest import json as wsme_json
from wsme import types as wsme_types
from octavia.api.v2.types import load_balancer as lb_type
from octavia.tests.unit.api.common import base
class TestLoadBalancer(object):
_type = None
def test_load_balancer(self):
body = {"name": "test_name", "description": "test_description",
"vip_subnet_id": uuidutils.generate_uuid()}
lb = wsme_json.fromjson(self._type, body)
self.assertTrue(lb.admin_state_up)
def test_invalid_name(self):
body = {"name": 0}
self.assertRaises(exc.InvalidInput, wsme_json.fromjson, self._type,
body)
def test_name_length(self):
body = {"name": "x" * 256}
self.assertRaises(exc.InvalidInput, wsme_json.fromjson, self._type,
body)
def test_invalid_description(self):
body = {"description": 0}
self.assertRaises(exc.InvalidInput, wsme_json.fromjson, self._type,
body)
def test_description_length(self):
body = {"name": "x" * 256}
self.assertRaises(exc.InvalidInput, wsme_json.fromjson, self._type,
body)
def test_invalid_enabled(self):
body = {"admin_state_up": "notvalid"}
self.assertRaises(ValueError, wsme_json.fromjson, self._type,
body)
class TestLoadBalancerPOST(base.BaseTypesTest, TestLoadBalancer):
_type = lb_type.LoadBalancerPOST
def test_non_uuid_project_id(self):
body = {"name": "test_name", "description": "test_description",
"vip_subnet_id": uuidutils.generate_uuid(),
"project_id": "non-uuid"}
lb = wsme_json.fromjson(self._type, body)
self.assertEqual(lb.project_id, body['project_id'])
def test_vip(self):
body = {"vip_subnet_id": uuidutils.generate_uuid(),
"vip_port_id": uuidutils.generate_uuid()}
wsme_json.fromjson(self._type, body)
def test_invalid_ip_address(self):
body = {"vip_address": uuidutils.generate_uuid()}
self.assertRaises(exc.InvalidInput, wsme_json.fromjson, self._type,
body)
def test_invalid_port_id(self):
body = {"vip_port_id": "invalid_uuid"}
self.assertRaises(exc.InvalidInput, wsme_json.fromjson, self._type,
body)
def test_invalid_subnet_id(self):
body = {"vip_subnet_id": "invalid_uuid"}
self.assertRaises(exc.InvalidInput, wsme_json.fromjson, self._type,
body)
class TestLoadBalancerPUT(base.BaseTypesTest, TestLoadBalancer):
_type = lb_type.LoadBalancerPUT
def test_load_balancer(self):
body = {"name": "test_name", "description": "test_description"}
lb = wsme_json.fromjson(self._type, body)
self.assertEqual(wsme_types.Unset, lb.admin_state_up)