Adds the ability to failover a load balancer

This will allow an operator to force the failover of a load
balancer's underlying amphora for upgrades or other
maintenance.

- Adds a new failover endpoint to the queue
- Adds the functionality to the worker
- Adds the failover command to the producer
- Adds a failover controller so
  /lodabalancer/123/failover will initiate
  a failover and return 202
- Adds logic to insert the server group into the
  failover flow

Change-Id: Ic4698066773828ae37b55a8d79bd2df6fc6624be
This commit is contained in:
German Eichberger 2017-03-10 10:18:31 -05:00 committed by Michael Johnson
parent 6c1d424776
commit 01e5af1a01
15 changed files with 439 additions and 35 deletions

View File

@ -0,0 +1 @@
curl -X PUT -H "X-Auth-Token: <token>" http://198.51.100.10:9876/v2.0/lbaas/loadbalancers/4a13c573-623c-4d23-8a9c-581dc17ceb1f/failover

View File

@ -550,3 +550,43 @@ Response Example
.. literalinclude:: examples/loadbalancer-status-response.json
:language: javascript
Failover a load balancer
========================
.. rest_method:: PUT /v2.0/lbaas/loadbalancers/{loadbalancer_id}/failover
Performs a failover of a load balancer.
This operation is only available to users with load balancer administrative
rights.
.. rest_status_code:: success ../http-status.yaml
- 202
.. rest_status_code:: error ../http-status.yaml
- 401
- 403
- 404
- 409
- 500
Request
-------
.. rest_parameters:: ../parameters.yaml
- loadbalancer_id: path-loadbalancer-id
Curl Example
------------
.. literalinclude:: examples/loadbalancer-failover-curl
:language: bash
Response
--------
There is no body content for the response of a successful failover request.

View File

@ -245,7 +245,7 @@ def simulate_controller(data_model, delete=False, update=False, create=False):
LOG.info("Simulated Controller Handler Thread Complete")
def loadbalancer_controller(loadbalancer, delete=False, update=False,
create=False):
create=False, failover=False):
time.sleep(ASYNC_TIME)
LOG.info("Simulating controller operation for loadbalancer...")
@ -264,6 +264,11 @@ def simulate_controller(data_model, delete=False, update=False, create=False):
repo.load_balancer.update(db_api.get_session(), id=loadbalancer.id,
operating_status=constants.ONLINE,
provisioning_status=constants.ACTIVE)
elif failover:
repo.load_balancer.update(
db_api.get_session(), id=loadbalancer.id,
operating_status=constants.ONLINE,
provisioning_status=constants.PENDING_UPDATE)
LOG.info("Simulated Controller Handler Thread Complete")
controller = loadbalancer_controller

View File

@ -108,6 +108,17 @@ class LoadBalancerProducer(BaseProducer):
method_name = "delete_{0}".format(self.payload_class)
self.client.cast({}, method_name, **kw)
def failover(self, data_model):
"""sends a failover message to the controller via oslo.messaging
:param data_model:
"""
model_id = getattr(data_model, 'id', None)
p_class = self.payload_class
kw = {"{0}_id".format(p_class): model_id}
method_name = "failover_{0}".format(self.payload_class)
self.client.cast({}, method_name, **kw)
class ListenerProducer(BaseProducer):
"""Sends updates,deletes and creates to the RPC end of the queue consumer

View File

@ -457,13 +457,16 @@ class LoadBalancersController(base.BaseController):
the request to the StatusesController.
"""
if id and len(remainder) and (remainder[0] == 'status' or
remainder[0] == 'stats'):
remainder[0] == 'stats' or
remainder[0] == 'failover'):
controller = remainder[0]
remainder = remainder[1:]
if controller == 'status':
return StatusController(lb_id=id), remainder
elif controller == 'stats':
return StatisticsController(lb_id=id), remainder
elif controller == 'failover':
return FailoverController(lb_id=id), remainder
class StatusController(base.BaseController):
@ -519,3 +522,30 @@ class StatisticsController(base.BaseController, stats.StatsMixin):
result = self._convert_db_to_type(
lb_stats, lb_types.LoadBalancerStatisticsResponse)
return lb_types.StatisticsRootResponse(stats=result)
class FailoverController(LoadBalancersController):
def __init__(self, lb_id):
super(FailoverController, self).__init__()
self.lb_id = lb_id
@wsme_pecan.wsexpose(None, wtypes.text, status_code=202)
def put(self, **kwargs):
"""Fails over a loadbalancer"""
context = pecan.request.context.get('octavia_context')
db_lb = self._get_db_lb(context.session, self.lb_id)
self._auth_validate_action(context, db_lb.project_id,
constants.RBAC_PUT_FAILOVER)
self._test_lb_status(context.session, self.lb_id)
try:
LOG.info("Sending failover request for lb %s to the handler",
self.lb_id)
self.handler.failover(db_lb)
except Exception:
with excutils.save_and_reraise_exception(reraise=False):
self.repositories.load_balancer.update(
context.session, self.lb_id,
provisioning_status=constants.ERROR)

View File

@ -297,6 +297,7 @@ RPC_NAMESPACE_CONTROLLER_AGENT = 'controller'
LB_CREATE_FAILOVER_PRIORITY = 20
LB_CREATE_NORMAL_PRIORITY = 40
LB_CREATE_SPARES_POOL_PRIORITY = 60
LB_CREATE_ADMIN_FAILOVER_PRIORITY = 80
BUILD_TYPE_PRIORITY = 'build_type_priority'
# Active standalone roles and topology
@ -434,6 +435,7 @@ DEFAULT_PAGE_SIZE = 1000
# RBAC
LOADBALANCER_API = 'os_load-balancer_api'
RULE_API_ADMIN = 'rule:load-balancer:admin'
RULE_API_READ = 'rule:load-balancer:read'
RULE_API_READ_GLOBAL = 'rule:load-balancer:read-global'
RULE_API_WRITE = 'rule:load-balancer:write'
@ -450,6 +452,7 @@ RBAC_L7RULE = '{}:l7rule:'.format(LOADBALANCER_API)
RBAC_QUOTA = '{}:quota:'.format(LOADBALANCER_API)
RBAC_POST = 'post'
RBAC_PUT = 'put'
RBAC_PUT_FAILOVER = 'put_failover'
RBAC_DELETE = 'delete'
RBAC_GET_ONE = 'get_one'
RBAC_GET_ALL = 'get_all'

View File

@ -53,6 +53,11 @@ class Endpoint(object):
LOG.info('Deleting load balancer \'%s\'...', load_balancer_id)
self.worker.delete_load_balancer(load_balancer_id, cascade)
def failover_load_balancer(self, context, load_balancer_id):
LOG.info('Failing over amphora in load balancer \'%s\'...',
load_balancer_id)
self.worker.failover_loadbalancer(load_balancer_id)
def create_listener(self, context, listener_id):
LOG.info('Creating listener \'%s\'...', listener_id)
self.worker.create_listener(listener_id)

View File

@ -613,21 +613,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_l7rule_tf.run()
def failover_amphora(self, amphora_id):
"""Perform failover operations for an amphora.
def _perform_amphora_failover(self, amp, priority):
"""Internal method to perform failover operations for an amphora.
:param amphora_id: ID for amphora to failover
:param amp: The amphora to failover
:param priority: The create priority
:returns: None
:raises AmphoraNotFound: The referenced amphora was not found
"""
try:
amp = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
stored_params = {constants.FAILED_AMPHORA: amp,
constants.LOADBALANCER_ID: amp.load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_FAILOVER_PRIORITY}
constants.BUILD_TYPE_PRIORITY: priority, }
if (CONF.house_keeping.spare_amphora_pool_size == 0) and (
CONF.nova.enable_anti_affinity is False):
LOG.warning("Failing over amphora with no spares pool may "
"cause delays in failover times while a new "
"amphora instance boots.")
# if we run with anti-affinity we need to set the server group
# as well
@ -639,20 +641,75 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
lb[0].server_group_id)
failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(
role=amp.role,
self._amphora_flows.get_failover_flow(role=amp.role,
status=amp.status),
store=stored_params)
with tf_logging.DynamicLoggingListener(
failover_amphora_tf, log=LOG,
hide_inputs_outputs_of=self._exclude_result_logging_tasks):
failover_amphora_tf.run()
def failover_amphora(self, amphora_id):
"""Perform failover operations for an amphora.
:param amphora_id: ID for amphora to failover
:returns: None
:raises AmphoraNotFound: The referenced amphora was not found
"""
try:
amp = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
self._perform_amphora_failover(
amp, constants.LB_CREATE_FAILOVER_PRIORITY)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error("Failover exception: %s", e)
def failover_loadbalancer(self, load_balancer_id):
"""Perform failover operations for a load balancer.
:param load_balancer_id: ID for load balancer to failover
:returns: None
:raises LBNotFound: The referenced load balancer was not found
"""
# this is a bit pedestrian right now but should be sufficient for now
try:
lb = self._lb_repo.get(db_apis.get_session(),
id=load_balancer_id)
self._lb_repo.update(db_apis.get_session(), load_balancer_id,
provisioning_status=constants.PENDING_UPDATE)
amps = lb.amphorae
for amp in amps:
# failover amphora in backup role
# Note: this amp may not currently be the backup
# TODO(johnsom) Change this to query the amp state
# once the amp API supports it.
if amp.role == constants.ROLE_BACKUP:
self._perform_amphora_failover(
amp, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
for amp in amps:
# failover everyhting else
if amp.role != constants.ROLE_BACKUP:
self._perform_amphora_failover(
amp, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
self._lb_repo.update(
db_apis.get_session(), load_balancer_id,
provisioning_status=constants.ACTIVE)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error("LB %(lbid)s failover exception: %(exc)s",
{'libid': load_balancer_id, 'exc': e})
self._lb_repo.update(
db_apis.get_session(), load_balancer_id,
provisioning_status=constants.ERROR)
def amphora_cert_rotation(self, amphora_id):
"""Perform cert rotation for an amphora.

View File

@ -297,6 +297,17 @@ class AmphoraFlows(object):
database_tasks.TestLBStatusSetPendingInDB(
requires=constants.LOADBALANCER_ID))
# Note: It seems intuitive to boot an amphora prior to deleting
# the old amphora, however this is a complicated issue.
# If the target host (due to anit-affinity) is resource
# constrained, this will fail where a post-delete will
# succeed. Since this is async with the API it would result
# in the LB ending in ERROR though the amps are still alive.
# Consider in the future making this a complicated
# try-on-failure-retry flow, or move upgrade failovers to be
# synchronous with the API. For now spares pool and act/stdby
# will mitigate most of this delay.
# Delete the old amphora
failover_amphora_flow.add(
database_tasks.MarkAmphoraPendingDeleteInDB(

View File

@ -479,7 +479,7 @@ class AssociateFailoverAmphoraWithLBID(BaseDatabaseTask):
class MapLoadbalancerToAmphora(BaseDatabaseTask):
"""Maps and assigns a load balancer to an amphora in the database."""
def execute(self, loadbalancer_id):
def execute(self, loadbalancer_id, server_group_id=None):
"""Allocates an Amphora for the load balancer in the database.
:param loadbalancer_id: The load balancer id to map to an amphora
@ -490,6 +490,11 @@ class MapLoadbalancerToAmphora(BaseDatabaseTask):
LOG.debug("Allocating an Amphora for load balancer with id %s",
loadbalancer_id)
if server_group_id is not None:
LOG.debug("Load balancer is using anti-affinity. Skipping spares "
"pool allocation.")
return None
amp = self.amphora_repo.allocate_and_associate(
db_apis.get_session(),
loadbalancer_id)

View File

@ -40,6 +40,10 @@ rules = [
policy.RuleDefault('load-balancer:owner', 'project_id:%(project_id)s'),
# API access roles
policy.RuleDefault('load-balancer:admin', 'is_admin:True or '
'role:admin or '
'role:load-balancer_admin'),
policy.RuleDefault('load-balancer:observer_and_owner',
'role:load-balancer_observer and '
'rule:load-balancer:owner'),
@ -55,29 +59,32 @@ rules = [
policy.RuleDefault('load-balancer:read',
'rule:load-balancer:observer_and_owner or '
'rule:load-balancer:global_observer or '
'rule:load-balancer:member_and_owner or is_admin:True'),
'rule:load-balancer:member_and_owner or '
'rule:load-balancer:admin'),
policy.RuleDefault('load-balancer:read-global',
'rule:load-balancer:global_observer or '
'is_admin:True'),
'rule:load-balancer:admin'),
policy.RuleDefault('load-balancer:write',
'rule:load-balancer:member_and_owner or is_admin:True'),
'rule:load-balancer:member_and_owner or '
'rule:load-balancer:admin'),
policy.RuleDefault('load-balancer:read-quota',
'rule:load-balancer:observer_and_owner or '
'rule:load-balancer:global_observer or '
'rule:load-balancer:member_and_owner or '
'role:load-balancer_quota_admin or '
'is_admin:True'),
'rule:load-balancer:admin'),
policy.RuleDefault('load-balancer:read-quota-global',
'rule:load-balancer:global_observer or '
'role:load-balancer_quota_admin or '
'is_admin:True'),
'rule:load-balancer:admin'),
policy.RuleDefault('load-balancer:write-quota',
'role:load-balancer_quota_admin or is_admin:True'),
'role:load-balancer_quota_admin or '
'rule:load-balancer:admin'),
]

View File

@ -76,6 +76,14 @@ rules = [
[{'method': 'GET',
'path': '/v2.0/lbaas/loadbalancers/{loadbalancer_id}/status'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_LOADBALANCER,
action=constants.RBAC_PUT_FAILOVER),
constants.RULE_API_ADMIN,
"Failover a Load Balancer",
[{'method': 'PUT',
'path': '/v2.0/lbaas/loadbalancers/{loadbalancer_id}/failover'}]
),
]

View File

@ -1387,6 +1387,179 @@ class TestLoadBalancer(base.BaseAPITest):
path = self.LB_PATH.format(lb_id='bad_uuid')
self.delete(path, status=404)
def test_failover(self):
project_id = uuidutils.generate_uuid()
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
project_id=project_id,
description='desc1',
admin_state_up=False)
lb_dict = lb.get(self.root_tag)
lb = self.set_lb_status(lb_dict.get('id'))
self.app.put(self._get_full_path(
self.LB_PATH.format(lb_id=lb_dict.get('id')) + "/failover"),
status=202)
def test_failover_pending(self):
project_id = uuidutils.generate_uuid()
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
project_id=project_id,
description='desc1',
admin_state_up=False)
lb_dict = lb.get(self.root_tag)
lb = self.set_lb_status(lb_dict.get('id'),
status=constants.PENDING_UPDATE)
self.app.put(self._get_full_path(
self.LB_PATH.format(lb_id=lb_dict.get('id')) + "/failover"),
status=409)
def test_failover_error(self):
project_id = uuidutils.generate_uuid()
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
project_id=project_id,
description='desc1',
admin_state_up=False)
lb_dict = lb.get(self.root_tag)
lb = self.set_lb_status(lb_dict.get('id'),
status=constants.ERROR)
self.app.put(self._get_full_path(
self.LB_PATH.format(lb_id=lb_dict.get('id')) + "/failover"),
status=409)
def test_failover_not_authorized(self):
project_id = uuidutils.generate_uuid()
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
project_id=project_id,
description='desc1',
admin_state_up=False)
lb_dict = lb.get(self.root_tag)
lb = self.set_lb_status(lb_dict.get('id'))
path = self._get_full_path(self.LB_PATH.format(
lb_id=lb_dict.get('id')) + "/failover")
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()):
response = self.app.put(path, status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
def test_failover_not_authorized_no_role(self):
project_id = uuidutils.generate_uuid()
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
project_id=project_id,
description='desc1',
admin_state_up=False)
lb_dict = lb.get(self.root_tag)
lb = self.set_lb_status(lb_dict.get('id'))
path = self._get_full_path(self.LB_PATH.format(
lb_id=lb_dict.get('id')) + "/failover")
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': [],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.app.put(path, status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
def test_failover_authorized_lb_admin(self):
project_id = uuidutils.generate_uuid()
project_id_2 = uuidutils.generate_uuid()
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
project_id=project_id,
description='desc1',
admin_state_up=False)
lb_dict = lb.get(self.root_tag)
lb = self.set_lb_status(lb_dict.get('id'))
path = self._get_full_path(self.LB_PATH.format(
lb_id=lb_dict.get('id')) + "/failover")
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project_id_2):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_admin'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project_id_2}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
self.app.put(path, status=202)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
def test_failover_authorized_no_auth(self):
project_id = uuidutils.generate_uuid()
project_id_2 = uuidutils.generate_uuid()
lb = self.create_load_balancer(uuidutils.generate_uuid(),
name='lb1',
project_id=project_id,
description='desc1',
admin_state_up=False)
lb_dict = lb.get(self.root_tag)
lb = self.set_lb_status(lb_dict.get('id'))
path = self._get_full_path(self.LB_PATH.format(
lb_id=lb_dict.get('id')) + "/failover")
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.NOAUTH)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project_id_2):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project_id_2}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
self.app.put(path, status=202)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
def test_create_with_bad_handler(self):
self.handler_mock().load_balancer.create.side_effect = Exception()
api_lb = self.create_load_balancer(

View File

@ -56,6 +56,11 @@ class TestEndpoint(base.TestCase):
self.ep.worker.delete_load_balancer.assert_called_once_with(
self.resource_id, False)
def test_failover_load_balancer(self):
self.ep.failover_load_balancer(self.context, self.resource_id)
self.ep.worker.failover_loadbalancer.assert_called_once_with(
self.resource_id)
def test_create_listener(self):
self.ep.create_listener(self.context, self.resource_id)
self.ep.worker.create_listener.assert_called_once_with(

View File

@ -1101,6 +1101,49 @@ class TestControllerWorker(base.TestCase):
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.'
'controller_worker.ControllerWorker._perform_amphora_failover')
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_failover_loadbalancer(self,
mock_update,
mock_perform,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_amphora_mock2 = mock.MagicMock()
_load_balancer_mock.amphorae = [_amphora_mock, _amphora_mock2]
cw = controller_worker.ControllerWorker()
cw.failover_loadbalancer('123')
mock_perform.assert_called_with(
_amphora_mock2, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
mock_update.assert_called_with('TEST', '123',
provisioning_status=constants.ACTIVE)
mock_perform.reset
_load_balancer_mock.amphorae = [_amphora_mock, _amphora_mock2]
_amphora_mock2.role = constants.ROLE_BACKUP
cw.failover_loadbalancer('123')
# because mock2 gets failed over earlier now _amphora_mock
# is the last one
mock_perform.assert_called_with(
_amphora_mock, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
mock_update.assert_called_with('TEST', '123',
provisioning_status=constants.ACTIVE)
mock_perform.reset
mock_perform.side_effect = OverflowError()
self.assertRaises(OverflowError, cw.failover_loadbalancer, 123)
mock_update.assert_called_with('TEST', 123,
provisioning_status=constants.ERROR)
@mock.patch('octavia.controller.worker.flows.'
'amphora_flows.AmphoraFlows.get_failover_flow',
return_value=_flow_mock)