Implement provider drivers - Pool

This patch adds provider driver support to the Octavia v2
Pool API.

This patch also creates a provider driver for Octavia, fully
implementing the pool methods.

Story: 1655768
Task: 5165

Depends-On: https://review.openstack.org/565640
Change-Id: Id4bf209c202893c93b43fe723c02d03b8c207ab5
This commit is contained in:
Michael Johnson 2018-05-07 11:31:39 -07:00
parent 7a24b08434
commit 0acb622262
4 changed files with 114 additions and 100 deletions
octavia
api
drivers/amphora_driver
v2/controllers
common
tests/functional/api/v2

@ -109,7 +109,14 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
self.client.cast({}, 'delete_pool', **payload)
def pool_update(self, pool):
pass
pool_dict = pool.to_dict()
if 'admin_state_up' in pool_dict:
pool_dict['enabled'] = pool_dict.pop('admin_state_up')
pool_id = pool_dict.pop('pool_id')
payload = {consts.POOL_ID: pool_id,
consts.POOL_UPDATES: pool_dict}
self.client.cast({}, 'update_pool', **payload)
# Member
def member_create(self, member):

@ -22,6 +22,9 @@ import pecan
from wsme import types as wtypes
from wsmeext import pecan as wsme_pecan
from octavia.api.drivers import data_models as driver_dm
from octavia.api.drivers import driver_factory
from octavia.api.drivers import utils as driver_utils
from octavia.api.v2.controllers import base
from octavia.api.v2.controllers import health_monitor
from octavia.api.v2.controllers import member
@ -123,24 +126,6 @@ class PoolsController(base.BaseController):
# do not give any information as to what constraint failed
raise exceptions.InvalidOption(value='', option='')
def _send_pool_to_handler(self, session, db_pool, listener_id):
try:
LOG.info("Sending Creation of Pool %s to handler", db_pool.id)
self.handler.create(db_pool)
except Exception:
with excutils.save_and_reraise_exception(
reraise=False), db_api.get_lock_session() as lock_session:
self._reset_lb_and_listener_statuses(
lock_session, lb_id=db_pool.load_balancer_id,
listener_ids=[listener_id] if listener_id else [])
# Pool now goes to ERROR
self.repositories.pool.update(
lock_session, db_pool.id,
provisioning_status=constants.ERROR)
db_pool = self._get_db_pool(session, db_pool.id)
result = self._convert_db_to_type(db_pool, pool_types.PoolResponse)
return pool_types.PoolRootResponse(pool=result)
@wsme_pecan.wsexpose(pool_types.PoolRootResponse,
body=pool_types.PoolRootPOST, status_code=201)
def post(self, pool_):
@ -157,13 +142,14 @@ class PoolsController(base.BaseController):
context = pecan.request.context.get('octavia_context')
if pool.loadbalancer_id:
pool.project_id = self._get_lb_project_id(context.session,
pool.loadbalancer_id)
pool.project_id, provider = self._get_lb_project_id_provider(
context.session, pool.loadbalancer_id)
elif pool.listener_id:
listener = self.repositories.listener.get(
context.session, id=pool.listener_id)
pool.project_id = listener.project_id
pool.loadbalancer_id = listener.load_balancer_id
pool.project_id, provider = self._get_lb_project_id_provider(
context.session, pool.loadbalancer_id)
else:
msg = _("Must provide at least one of: "
"loadbalancer_id, listener_id")
@ -176,6 +162,9 @@ class PoolsController(base.BaseController):
sp_dict = pool.session_persistence.to_dict(render_unsets=False)
validate.check_session_persistence(sp_dict)
# Load the driver early as it also provides validation
driver = driver_factory.get_driver(provider)
lock_session = db_api.get_session(autocommit=False)
try:
if self.repositories.check_quota_met(
@ -201,13 +190,25 @@ class PoolsController(base.BaseController):
db_pool = self._validate_create_pool(
lock_session, pool_dict, listener_id)
# Prepare the data for the driver data model
provider_pool = (
driver_utils.db_pool_to_provider_pool(db_pool))
# Dispatch to the driver
LOG.info("Sending create Pool %s to provider %s",
db_pool.id, driver.name)
driver_utils.call_provider(
driver.name, driver.pool_create, provider_pool)
lock_session.commit()
except Exception:
with excutils.save_and_reraise_exception():
lock_session.rollback()
return self._send_pool_to_handler(context.session, db_pool,
listener_id=listener_id)
db_pool = self._get_db_pool(context.session, db_pool.id)
result = self._convert_db_to_type(db_pool, pool_types.PoolResponse)
return pool_types.PoolRootResponse(pool=result)
def _graph_create(self, session, lock_session, pool_dict):
load_balancer_id = pool_dict['load_balancer_id']
@ -257,33 +258,44 @@ class PoolsController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_pool = self._get_db_pool(context.session, id, show_deleted=False)
self._auth_validate_action(context, db_pool.project_id,
constants.RBAC_PUT)
project_id, provider = self._get_lb_project_id_provider(
context.session, db_pool.load_balancer_id)
self._auth_validate_action(context, project_id, constants.RBAC_PUT)
if pool.session_persistence:
sp_dict = pool.session_persistence.to_dict(render_unsets=False)
validate.check_session_persistence(sp_dict)
self._test_lb_and_listener_statuses(
context.session, lb_id=db_pool.load_balancer_id,
listener_ids=self._get_affected_listener_ids(db_pool))
self.repositories.pool.update(
context.session, db_pool.id,
provisioning_status=constants.PENDING_UPDATE)
# Load the driver early as it also provides validation
driver = driver_factory.get_driver(provider)
try:
LOG.info("Sending Update of Pool %s to handler", id)
self.handler.update(db_pool, pool)
except Exception:
with excutils.save_and_reraise_exception(
reraise=False), db_api.get_lock_session() as lock_session:
self._reset_lb_and_listener_statuses(
lock_session, lb_id=db_pool.load_balancer_id,
listener_ids=self._get_affected_listener_ids(db_pool))
# Pool now goes to ERROR
self.repositories.pool.update(
lock_session, db_pool.id,
provisioning_status=constants.ERROR)
with db_api.get_lock_session() as lock_session:
self._test_lb_and_listener_statuses(
context.session, lb_id=db_pool.load_balancer_id,
listener_ids=self._get_affected_listener_ids(db_pool))
# Prepare the data for the driver data model
pool_dict = pool.to_dict(render_unsets=False)
pool_dict['id'] = id
provider_pool_dict = (
driver_utils.pool_dict_to_provider_dict(pool_dict))
# Dispatch to the driver
LOG.info("Sending update Pool %s to provider %s", id, driver.name)
driver_utils.call_provider(
driver.name, driver.pool_update,
driver_dm.Pool.from_dict(provider_pool_dict))
# Update the database to reflect what the driver just accepted
pool.provisioning_status = constants.PENDING_UPDATE
db_pool_dict = pool.to_dict(render_unsets=False)
self.repositories.update_pool_and_sp(lock_session, id,
db_pool_dict)
# Force SQL alchemy to query the DB, otherwise we get inconsistent
# results
context.session.expire_all()
db_pool = self._get_db_pool(context.session, id)
result = self._convert_db_to_type(db_pool, pool_types.PoolResponse)
return pool_types.PoolRootResponse(pool=result)
@ -297,29 +309,24 @@ class PoolsController(base.BaseController):
raise exceptions.PoolInUseByL7Policy(
id=db_pool.id, l7policy_id=db_pool.l7policies[0].id)
self._auth_validate_action(context, db_pool.project_id,
constants.RBAC_DELETE)
project_id, provider = self._get_lb_project_id_provider(
context.session, db_pool.load_balancer_id)
self._test_lb_and_listener_statuses(
context.session, lb_id=db_pool.load_balancer_id,
listener_ids=self._get_affected_listener_ids(db_pool))
self.repositories.pool.update(
context.session, db_pool.id,
provisioning_status=constants.PENDING_DELETE)
self._auth_validate_action(context, project_id, constants.RBAC_DELETE)
try:
LOG.info("Sending Deletion of Pool %s to handler", db_pool.id)
self.handler.delete(db_pool)
except Exception:
with excutils.save_and_reraise_exception(
reraise=False), db_api.get_lock_session() as lock_session:
self._reset_lb_and_listener_statuses(
lock_session, lb_id=db_pool.load_balancer_id,
listener_ids=self._get_affected_listener_ids(db_pool))
# Pool now goes to ERROR
self.repositories.pool.update(
lock_session, db_pool.id,
provisioning_status=constants.ERROR)
# Load the driver early as it also provides validation
driver = driver_factory.get_driver(provider)
with db_api.get_lock_session() as lock_session:
self._test_lb_and_listener_statuses(
lock_session, lb_id=db_pool.load_balancer_id,
listener_ids=self._get_affected_listener_ids(db_pool))
self.repositories.pool.update(
lock_session, db_pool.id,
provisioning_status=constants.PENDING_DELETE)
LOG.info("Sending delete Pool %s to provider %s", id, driver.name)
driver_utils.call_provider(driver.name, driver.pool_delete, id)
@pecan.expose()
def _lookup(self, pool_id, *remainder):

@ -227,6 +227,7 @@ L7POLICY_ID = 'l7policy_id'
L7RULE_ID = 'l7rule_id'
LOAD_BALANCER_UPDATES = 'load_balancer_updates'
LISTENER_UPDATES = 'listener_updates'
POOL_UPDATES = 'pool_updates'
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'

@ -21,6 +21,7 @@ from oslo_utils import uuidutils
from octavia.common import constants
import octavia.common.context
from octavia.common import data_models
from octavia.common import exceptions
from octavia.tests.functional.api.v2 import base
@ -725,20 +726,19 @@ class TestPool(base.BaseAPITest):
'lb_algorithm': constants.LB_ALGORITHM_ROUND_ROBIN}
self.post(self.POOLS_PATH, self._build_body(lb_pool), status=400)
def test_create_with_bad_handler(self):
self.handler_mock().pool.create.side_effect = Exception()
api_pool = self.create_pool(
self.lb_id,
constants.PROTOCOL_HTTP,
constants.LB_ALGORITHM_ROUND_ROBIN,
listener_id=self.listener_id).get(self.root_tag)
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
pool_id=api_pool.get('id'),
lb_prov_status=constants.ACTIVE,
listener_prov_status=constants.ACTIVE,
pool_prov_status=constants.ERROR,
pool_op_status=constants.OFFLINE)
@mock.patch('octavia.api.drivers.utils.call_provider')
def test_create_with_bad_provider(self, mock_provider):
mock_provider.side_effect = exceptions.ProviderDriverError(
prov='bad_driver', user_msg='broken')
lb_pool = {
'loadbalancer_id': self.lb_id,
'protocol': constants.PROTOCOL_HTTP,
'lb_algorithm': constants.LB_ALGORITHM_ROUND_ROBIN,
'project_id': self.project_id}
response = self.post(self.POOLS_PATH, self._build_body(lb_pool),
status=500)
self.assertIn('Provider \'bad_driver\' reports error: broken',
response.json.get('faultstring'))
def test_create_over_quota(self):
self.start_quota_mock(data_models.Pool)
@ -768,7 +768,7 @@ class TestPool(base.BaseAPITest):
self.set_lb_status(self.lb_id)
response = self.get(self.POOL_PATH.format(
pool_id=api_pool.get('id'))).json.get(self.root_tag)
self.assertNotEqual('new_name', response.get('name'))
self.assertEqual('new_name', response.get('name'))
self.assertIsNotNone(response.get('created_at'))
self.assertIsNotNone(response.get('updated_at'))
self.assert_correct_status(
@ -820,7 +820,7 @@ class TestPool(base.BaseAPITest):
self.set_lb_status(self.lb_id)
response = self.get(self.POOL_PATH.format(
pool_id=api_pool.get('id'))).json.get(self.root_tag)
self.assertNotEqual('new_name', response.get('name'))
self.assertEqual('new_name', response.get('name'))
self.assertIsNotNone(response.get('created_at'))
self.assertIsNotNone(response.get('updated_at'))
self.assert_correct_status(
@ -864,7 +864,8 @@ class TestPool(base.BaseAPITest):
lb_id=self.lb_id, listener_id=self.listener_id,
pool_id=api_pool.get('id'))
def test_update_with_bad_handler(self):
@mock.patch('octavia.api.drivers.utils.call_provider')
def test_update_with_bad_provider(self, mock_provider):
api_pool = self.create_pool(
self.lb_id,
constants.PROTOCOL_HTTP,
@ -872,13 +873,12 @@ class TestPool(base.BaseAPITest):
listener_id=self.listener_id).get(self.root_tag)
self.set_lb_status(lb_id=self.lb_id)
new_pool = {'name': 'new_name'}
self.handler_mock().pool.update.side_effect = Exception()
self.put(self.POOL_PATH.format(pool_id=api_pool.get('id')),
self._build_body(new_pool))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
pool_id=api_pool.get('id'),
pool_prov_status=constants.ERROR)
mock_provider.side_effect = exceptions.ProviderDriverError(
prov='bad_driver', user_msg='broken')
response = self.put(self.POOL_PATH.format(pool_id=api_pool.get('id')),
self._build_body(new_pool), status=500)
self.assertIn('Provider \'bad_driver\' reports error: broken',
response.json.get('faultstring'))
def test_delete(self):
api_pool = self.create_pool(
@ -1005,7 +1005,8 @@ class TestPool(base.BaseAPITest):
self.delete(self.POOL_PATH.format(
pool_id=api_pool.get('id')), status=409)
def test_delete_with_bad_handler(self):
@mock.patch('octavia.api.drivers.utils.call_provider')
def test_delete_with_bad_provider(self, mock_provider):
api_pool = self.create_pool(
self.lb_id,
constants.PROTOCOL_HTTP,
@ -1021,12 +1022,10 @@ class TestPool(base.BaseAPITest):
self.assertIsNone(api_pool.pop('updated_at'))
self.assertIsNotNone(response.pop('updated_at'))
self.assertEqual(api_pool, response)
self.handler_mock().pool.delete.side_effect = Exception()
self.delete(self.POOL_PATH.format(pool_id=api_pool.get('id')))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
pool_id=api_pool.get('id'),
pool_prov_status=constants.ERROR)
mock_provider.side_effect = exceptions.ProviderDriverError(
prov='bad_driver', user_msg='broken')
self.delete(self.POOL_PATH.format(pool_id=api_pool.get('id')),
status=500)
def test_create_with_session_persistence(self):
sp = {"type": constants.SESSION_PERSISTENCE_APP_COOKIE,
@ -1125,7 +1124,7 @@ class TestPool(base.BaseAPITest):
self._build_body(new_pool))
response = self.get(self.POOL_PATH.format(
pool_id=api_pool.get('id'))).json.get(self.root_tag)
self.assertNotEqual(sp, response.get('session_persistence'))
self.assertEqual(sp, response.get('session_persistence'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
pool_id=api_pool.get('id'),
@ -1154,7 +1153,7 @@ class TestPool(base.BaseAPITest):
self._build_body(new_pool))
response = self.get(self.POOL_PATH.format(
pool_id=api_pool.get('id'))).json.get(self.root_tag)
self.assertNotEqual(sess_p, response.get('session_persistence'))
self.assertEqual(sess_p, response.get('session_persistence'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
pool_id=api_pool.get('id'),
@ -1313,7 +1312,7 @@ class TestPool(base.BaseAPITest):
new_sp = {"pool": {"session_persistence": None}}
response = self.put(self.POOL_PATH.format(
pool_id=api_pool.get('id')), new_sp).json.get(self.root_tag)
self.assertIsNotNone(response.get('session_persistence'))
self.assertIsNone(response.get('session_persistence'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
pool_id=api_pool.get('id'),