Merge "Implement provider drivers - L7 Policy"
This commit is contained in:
commit
9d069628c1
@ -213,7 +213,14 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
|
||||
self.client.cast({}, 'delete_l7policy', **payload)
|
||||
|
||||
def l7policy_update(self, l7policy):
|
||||
pass
|
||||
l7policy_dict = l7policy.to_dict()
|
||||
if 'admin_state_up' in l7policy_dict:
|
||||
l7policy_dict['enabled'] = l7policy_dict.pop('admin_state_up')
|
||||
l7policy_id = l7policy_dict.pop('l7policy_id')
|
||||
|
||||
payload = {consts.L7POLICY_ID: l7policy_id,
|
||||
consts.L7POLICY_UPDATES: l7policy_dict}
|
||||
self.client.cast({}, 'update_l7policy', **payload)
|
||||
|
||||
# L7 Rule
|
||||
def l7rule_create(self, l7rule):
|
||||
|
@ -305,17 +305,21 @@ def hm_dict_to_provider_dict(hm_dict):
|
||||
def db_l7policies_to_provider_l7policies(db_l7policies):
|
||||
provider_l7policies = []
|
||||
for l7policy in db_l7policies:
|
||||
new_l7policy_dict = l7policy_dict_to_provider_dict(
|
||||
l7policy.to_dict(recurse=True))
|
||||
if 'l7rules' in new_l7policy_dict:
|
||||
del new_l7policy_dict['l7rules']
|
||||
new_l7rules = db_l7rules_to_provider_l7rules(l7policy.l7rules)
|
||||
new_l7policy_dict['rules'] = new_l7rules
|
||||
provider_l7policies.append(
|
||||
driver_dm.L7Policy.from_dict(new_l7policy_dict))
|
||||
provider_l7policy = db_l7policy_to_provider_l7policy(l7policy)
|
||||
provider_l7policies.append(provider_l7policy)
|
||||
return provider_l7policies
|
||||
|
||||
|
||||
def db_l7policy_to_provider_l7policy(db_l7policy):
|
||||
new_l7policy_dict = l7policy_dict_to_provider_dict(
|
||||
db_l7policy.to_dict(recurse=True))
|
||||
if 'l7rules' in new_l7policy_dict:
|
||||
del new_l7policy_dict['l7rules']
|
||||
new_l7rules = db_l7rules_to_provider_l7rules(db_l7policy.l7rules)
|
||||
new_l7policy_dict['rules'] = new_l7rules
|
||||
return driver_dm.L7Policy.from_dict(new_l7policy_dict)
|
||||
|
||||
|
||||
def l7policy_dict_to_provider_dict(l7policy_dict):
|
||||
new_l7policy_dict = _base_to_provider_dict(l7policy_dict)
|
||||
new_l7policy_dict['l7policy_id'] = new_l7policy_dict.pop('id')
|
||||
|
@ -21,6 +21,9 @@ import pecan
|
||||
from wsme import types as wtypes
|
||||
from wsmeext import pecan as wsme_pecan
|
||||
|
||||
from octavia.api.drivers import data_models as driver_dm
|
||||
from octavia.api.drivers import driver_factory
|
||||
from octavia.api.drivers import utils as driver_utils
|
||||
from octavia.api.v2.controllers import base
|
||||
from octavia.api.v2.controllers import l7rule
|
||||
from octavia.api.v2.types import l7policy as l7policy_types
|
||||
@ -41,7 +44,6 @@ class L7PolicyController(base.BaseController):
|
||||
|
||||
def __init__(self):
|
||||
super(L7PolicyController, self).__init__()
|
||||
self.handler = self.handler.l7policy
|
||||
|
||||
@wsme_pecan.wsexpose(l7policy_types.L7PolicyRootResponse, wtypes.text,
|
||||
[wtypes.text], ignore_extra_args=True)
|
||||
@ -119,26 +121,6 @@ class L7PolicyController(base.BaseController):
|
||||
# do not give any information as to what constraint failed
|
||||
raise exceptions.InvalidOption(value='', option='')
|
||||
|
||||
def _send_l7policy_to_handler(self, session, db_l7policy, lb_id):
|
||||
try:
|
||||
LOG.info("Sending Creation of L7Policy %s to handler",
|
||||
db_l7policy.id)
|
||||
self.handler.create(db_l7policy)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception(
|
||||
reraise=False), db_api.get_lock_session() as lock_session:
|
||||
self._reset_lb_and_listener_statuses(
|
||||
lock_session, lb_id=lb_id,
|
||||
listener_id=db_l7policy.listener_id)
|
||||
# L7Policy now goes to ERROR
|
||||
self.repositories.l7policy.update(
|
||||
lock_session, db_l7policy.id,
|
||||
provisioning_status=constants.ERROR)
|
||||
db_l7policy = self._get_db_l7policy(session, db_l7policy.id)
|
||||
result = self._convert_db_to_type(db_l7policy,
|
||||
l7policy_types.L7PolicyResponse)
|
||||
return l7policy_types.L7PolicyRootResponse(l7policy=result)
|
||||
|
||||
@wsme_pecan.wsexpose(l7policy_types.L7PolicyRootResponse,
|
||||
body=l7policy_types.L7PolicyRootPOST, status_code=201)
|
||||
def post(self, l7policy_):
|
||||
@ -154,11 +136,15 @@ class L7PolicyController(base.BaseController):
|
||||
listener = self._get_db_listener(
|
||||
context.session, listener_id)
|
||||
load_balancer_id = listener.load_balancer_id
|
||||
l7policy.project_id = listener.project_id
|
||||
l7policy.project_id, provider = self._get_lb_project_id_provider(
|
||||
context.session, load_balancer_id)
|
||||
|
||||
self._auth_validate_action(context, l7policy.project_id,
|
||||
constants.RBAC_POST)
|
||||
|
||||
# Load the driver early as it also provides validation
|
||||
driver = driver_factory.get_driver(provider)
|
||||
|
||||
lock_session = db_api.get_session(autocommit=False)
|
||||
try:
|
||||
if self.repositories.check_quota_met(
|
||||
@ -177,13 +163,26 @@ class L7PolicyController(base.BaseController):
|
||||
listener_ids=[listener_id])
|
||||
db_l7policy = self._validate_create_l7policy(
|
||||
lock_session, l7policy_dict)
|
||||
|
||||
# Prepare the data for the driver data model
|
||||
provider_l7policy = (
|
||||
driver_utils.db_l7policy_to_provider_l7policy(db_l7policy))
|
||||
|
||||
# Dispatch to the driver
|
||||
LOG.info("Sending create L7 Policy %s to provider %s",
|
||||
db_l7policy.id, driver.name)
|
||||
driver_utils.call_provider(
|
||||
driver.name, driver.l7policy_create, provider_l7policy)
|
||||
|
||||
lock_session.commit()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
lock_session.rollback()
|
||||
|
||||
return self._send_l7policy_to_handler(context.session, db_l7policy,
|
||||
lb_id=load_balancer_id)
|
||||
db_l7policy = self._get_db_l7policy(context.session, db_l7policy.id)
|
||||
result = self._convert_db_to_type(db_l7policy,
|
||||
l7policy_types.L7PolicyResponse)
|
||||
return l7policy_types.L7PolicyRootResponse(l7policy=result)
|
||||
|
||||
def _graph_create(self, lock_session, policy_dict):
|
||||
load_balancer_id = policy_dict.pop('load_balancer_id', None)
|
||||
@ -225,31 +224,42 @@ class L7PolicyController(base.BaseController):
|
||||
show_deleted=False)
|
||||
load_balancer_id, listener_id = self._get_listener_and_loadbalancer_id(
|
||||
db_l7policy)
|
||||
project_id, provider = self._get_lb_project_id_provider(
|
||||
context.session, load_balancer_id)
|
||||
|
||||
self._auth_validate_action(context, db_l7policy.project_id,
|
||||
constants.RBAC_PUT)
|
||||
self._auth_validate_action(context, project_id, constants.RBAC_PUT)
|
||||
|
||||
self._test_lb_and_listener_statuses(context.session,
|
||||
lb_id=load_balancer_id,
|
||||
listener_ids=[listener_id])
|
||||
self.repositories.l7policy.update(
|
||||
context.session, db_l7policy.id,
|
||||
provisioning_status=constants.PENDING_UPDATE)
|
||||
# Load the driver early as it also provides validation
|
||||
driver = driver_factory.get_driver(provider)
|
||||
|
||||
try:
|
||||
LOG.info("Sending Update of L7Policy %s to handler", id)
|
||||
self.handler.update(
|
||||
db_l7policy, sanitized_l7policy)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception(
|
||||
reraise=False), db_api.get_lock_session() as lock_session:
|
||||
self._reset_lb_and_listener_statuses(
|
||||
lock_session, lb_id=load_balancer_id,
|
||||
listener_id=db_l7policy.listener_id)
|
||||
# L7Policy now goes to ERROR
|
||||
self.repositories.l7policy.update(
|
||||
lock_session, db_l7policy.id,
|
||||
provisioning_status=constants.ERROR)
|
||||
with db_api.get_lock_session() as lock_session:
|
||||
|
||||
self._test_lb_and_listener_statuses(lock_session,
|
||||
lb_id=load_balancer_id,
|
||||
listener_ids=[listener_id])
|
||||
|
||||
# Prepare the data for the driver data model
|
||||
l7policy_dict = sanitized_l7policy.to_dict(render_unsets=False)
|
||||
l7policy_dict['id'] = id
|
||||
provider_l7policy_dict = (
|
||||
driver_utils.l7policy_dict_to_provider_dict(l7policy_dict))
|
||||
|
||||
# Dispatch to the driver
|
||||
LOG.info("Sending update L7 Policy %s to provider %s",
|
||||
id, driver.name)
|
||||
driver_utils.call_provider(
|
||||
driver.name, driver.l7policy_update,
|
||||
driver_dm.L7Policy.from_dict(provider_l7policy_dict))
|
||||
|
||||
# Update the database to reflect what the driver just accepted
|
||||
sanitized_l7policy.provisioning_status = constants.PENDING_UPDATE
|
||||
db_l7policy_dict = sanitized_l7policy.to_dict(render_unsets=False)
|
||||
self.repositories.l7policy.update(lock_session, id,
|
||||
**db_l7policy_dict)
|
||||
|
||||
# Force SQL alchemy to query the DB, otherwise we get inconsistent
|
||||
# results
|
||||
context.session.expire_all()
|
||||
db_l7policy = self._get_db_l7policy(context.session, id)
|
||||
result = self._convert_db_to_type(db_l7policy,
|
||||
l7policy_types.L7PolicyResponse)
|
||||
@ -263,34 +273,29 @@ class L7PolicyController(base.BaseController):
|
||||
show_deleted=False)
|
||||
load_balancer_id, listener_id = self._get_listener_and_loadbalancer_id(
|
||||
db_l7policy)
|
||||
project_id, provider = self._get_lb_project_id_provider(
|
||||
context.session, load_balancer_id)
|
||||
|
||||
self._auth_validate_action(context, db_l7policy.project_id,
|
||||
constants.RBAC_DELETE)
|
||||
self._auth_validate_action(context, project_id, constants.RBAC_DELETE)
|
||||
|
||||
if db_l7policy.provisioning_status == constants.DELETED:
|
||||
return
|
||||
|
||||
self._test_lb_and_listener_statuses(context.session,
|
||||
lb_id=load_balancer_id,
|
||||
listener_ids=[listener_id])
|
||||
self.repositories.l7policy.update(
|
||||
context.session, db_l7policy.id,
|
||||
provisioning_status=constants.PENDING_DELETE)
|
||||
# Load the driver early as it also provides validation
|
||||
driver = driver_factory.get_driver(provider)
|
||||
|
||||
try:
|
||||
LOG.info("Sending Deletion of L7Policy %s to handler",
|
||||
db_l7policy.id)
|
||||
self.handler.delete(db_l7policy)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception(
|
||||
reraise=False), db_api.get_lock_session() as lock_session:
|
||||
self._reset_lb_and_listener_statuses(
|
||||
lock_session, lb_id=load_balancer_id,
|
||||
listener_id=db_l7policy.listener_id)
|
||||
# L7Policy now goes to ERROR
|
||||
self.repositories.l7policy.update(
|
||||
lock_session, db_l7policy.id,
|
||||
provisioning_status=constants.ERROR)
|
||||
with db_api.get_lock_session() as lock_session:
|
||||
|
||||
self._test_lb_and_listener_statuses(lock_session,
|
||||
lb_id=load_balancer_id,
|
||||
listener_ids=[listener_id])
|
||||
self.repositories.l7policy.update(
|
||||
lock_session, db_l7policy.id,
|
||||
provisioning_status=constants.PENDING_DELETE)
|
||||
|
||||
LOG.info("Sending delete L7 Policy %s to provider %s",
|
||||
id, driver.name)
|
||||
driver_utils.call_provider(driver.name, driver.l7policy_delete, id)
|
||||
|
||||
@pecan.expose()
|
||||
def _lookup(self, l7policy_id, *remainder):
|
||||
|
@ -230,6 +230,7 @@ LISTENER_UPDATES = 'listener_updates'
|
||||
POOL_UPDATES = 'pool_updates'
|
||||
MEMBER_UPDATES = 'member_updates'
|
||||
HEALTH_MONITOR_UPDATES = 'health_monitor_updates'
|
||||
L7POLICY_UPDATES = 'l7policy_updates'
|
||||
|
||||
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
|
||||
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
|
||||
|
@ -21,6 +21,7 @@ from oslo_utils import uuidutils
|
||||
from octavia.common import constants
|
||||
import octavia.common.context
|
||||
from octavia.common import data_models
|
||||
from octavia.common import exceptions
|
||||
from octavia.tests.functional.api.v2 import base
|
||||
|
||||
|
||||
@ -631,16 +632,17 @@ class TestL7Policy(base.BaseAPITest):
|
||||
'redirect_url': 'bad url'}
|
||||
self.post(self.L7POLICIES_PATH, self._build_body(l7policy), status=400)
|
||||
|
||||
def test_create_with_bad_handler(self):
|
||||
self.handler_mock().l7policy.create.side_effect = Exception()
|
||||
api_l7policy = self.create_l7policy(self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT,
|
||||
).get(self.root_tag)
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
l7policy_id=api_l7policy.get('id'),
|
||||
l7policy_prov_status=constants.ERROR,
|
||||
l7policy_op_status=constants.OFFLINE)
|
||||
@mock.patch('octavia.api.drivers.utils.call_provider')
|
||||
def test_create_with_bad_provider(self, mock_provider):
|
||||
mock_provider.side_effect = exceptions.ProviderDriverError(
|
||||
prov='bad_driver', user_msg='broken')
|
||||
l7policy = {'listener_id': self.listener_id,
|
||||
'action': constants.L7POLICY_ACTION_REDIRECT_TO_URL,
|
||||
'redirect_url': 'http://a.com'}
|
||||
response = self.post(self.L7POLICIES_PATH, self._build_body(l7policy),
|
||||
status=500)
|
||||
self.assertIn('Provider \'bad_driver\' reports error: broken',
|
||||
response.json.get('faultstring'))
|
||||
|
||||
def test_create_over_quota(self):
|
||||
self.start_quota_mock(data_models.L7Policy)
|
||||
@ -660,7 +662,7 @@ class TestL7Policy(base.BaseAPITest):
|
||||
response = self.put(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')),
|
||||
self._build_body(new_l7policy)).json.get(self.root_tag)
|
||||
self.assertEqual(constants.L7POLICY_ACTION_REJECT,
|
||||
self.assertEqual(constants.L7POLICY_ACTION_REDIRECT_TO_URL,
|
||||
response.get('action'))
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
@ -704,7 +706,7 @@ class TestL7Policy(base.BaseAPITest):
|
||||
self._build_body(new_l7policy)).json.get(self.root_tag)
|
||||
|
||||
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
|
||||
self.assertEqual(constants.L7POLICY_ACTION_REJECT,
|
||||
self.assertEqual(constants.L7POLICY_ACTION_REDIRECT_TO_URL,
|
||||
response.get('action'))
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
@ -774,7 +776,8 @@ class TestL7Policy(base.BaseAPITest):
|
||||
l7policy_id=api_l7policy.get('id')),
|
||||
self._build_body(new_l7policy), status=400)
|
||||
|
||||
def test_update_with_bad_handler(self):
|
||||
@mock.patch('octavia.api.drivers.utils.call_provider')
|
||||
def test_update_with_bad_provider(self, mock_provider):
|
||||
api_l7policy = self.create_l7policy(self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT,
|
||||
).get(self.root_tag)
|
||||
@ -782,14 +785,13 @@ class TestL7Policy(base.BaseAPITest):
|
||||
new_l7policy = {
|
||||
'action': constants.L7POLICY_ACTION_REDIRECT_TO_URL,
|
||||
'redirect_url': 'http://www.example.com'}
|
||||
self.handler_mock().l7policy.update.side_effect = Exception()
|
||||
self.put(self.L7POLICY_PATH.format(
|
||||
mock_provider.side_effect = exceptions.ProviderDriverError(
|
||||
prov='bad_driver', user_msg='broken')
|
||||
response = self.put(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')),
|
||||
self._build_body(new_l7policy))
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
l7policy_id=api_l7policy.get('id'),
|
||||
l7policy_prov_status=constants.ERROR)
|
||||
self._build_body(new_l7policy), status=500)
|
||||
self.assertIn('Provider \'bad_driver\' reports error: broken',
|
||||
response.json.get('faultstring'))
|
||||
|
||||
def test_update_redirect_to_pool_bad_pool_id(self):
|
||||
api_l7policy = self.create_l7policy(self.listener_id,
|
||||
@ -935,7 +937,8 @@ class TestL7Policy(base.BaseAPITest):
|
||||
self.delete(self.L7POLICY_PATH.format(
|
||||
l7policy_id=uuidutils.generate_uuid()), status=404)
|
||||
|
||||
def test_delete_with_bad_handler(self):
|
||||
@mock.patch('octavia.api.drivers.utils.call_provider')
|
||||
def test_delete_with_bad_provider(self, mock_provider):
|
||||
api_l7policy = self.create_l7policy(self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT,
|
||||
).get(self.root_tag)
|
||||
@ -949,13 +952,10 @@ class TestL7Policy(base.BaseAPITest):
|
||||
self.assertIsNone(api_l7policy.pop('updated_at'))
|
||||
self.assertIsNotNone(response.pop('updated_at'))
|
||||
self.assertEqual(api_l7policy, response)
|
||||
self.handler_mock().l7policy.delete.side_effect = Exception()
|
||||
mock_provider.side_effect = exceptions.ProviderDriverError(
|
||||
prov='bad_driver', user_msg='broken')
|
||||
self.delete(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')))
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
l7policy_id=api_l7policy.get('id'),
|
||||
l7policy_prov_status=constants.ERROR)
|
||||
l7policy_id=api_l7policy.get('id')), status=500)
|
||||
|
||||
def test_create_when_lb_pending_update(self):
|
||||
self.create_l7policy(self.listener_id,
|
||||
|
Loading…
x
Reference in New Issue
Block a user