Merge "Create Amphora V2 provider driver"
This commit is contained in:
commit
ff4680eb71
344
octavia/api/drivers/amphora_driver/v2/driver.py
Normal file
344
octavia/api/drivers/amphora_driver/v2/driver.py
Normal file
@ -0,0 +1,344 @@
|
||||
# Copyright 2018 Rackspace, US Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from jsonschema import exceptions as js_exceptions
|
||||
from jsonschema import validate
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from stevedore import driver as stevedore_driver
|
||||
|
||||
from octavia_lib.api.drivers import data_models as driver_dm
|
||||
from octavia_lib.api.drivers import exceptions
|
||||
from octavia_lib.api.drivers import provider_base as driver_base
|
||||
|
||||
from octavia.api.drivers.amphora_driver import flavor_schema
|
||||
from octavia.api.drivers import utils as driver_utils
|
||||
from octavia.common import constants as consts
|
||||
from octavia.common import data_models
|
||||
from octavia.common import rpc
|
||||
from octavia.common import utils
|
||||
from octavia.db import api as db_apis
|
||||
from octavia.db import repositories
|
||||
from octavia.network import base as network_base
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_group('oslo_messaging', 'octavia.common.config')
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AmphoraProviderDriver(driver_base.ProviderDriver):
|
||||
def __init__(self):
|
||||
super(AmphoraProviderDriver, self).__init__()
|
||||
self.target = messaging.Target(
|
||||
namespace=consts.RPC_NAMESPACE_CONTROLLER_AGENT,
|
||||
topic=consts.TOPIC_AMPHORA_V2, version="2.0", fanout=False)
|
||||
self.client = rpc.get_client(self.target)
|
||||
self.repositories = repositories.Repositories()
|
||||
|
||||
# Load Balancer
|
||||
def create_vip_port(self, loadbalancer_id, project_id, vip_dictionary):
|
||||
vip_obj = driver_utils.provider_vip_dict_to_vip_obj(vip_dictionary)
|
||||
lb_obj = data_models.LoadBalancer(id=loadbalancer_id,
|
||||
project_id=project_id, vip=vip_obj)
|
||||
|
||||
network_driver = utils.get_network_driver()
|
||||
try:
|
||||
vip = network_driver.allocate_vip(lb_obj)
|
||||
except network_base.AllocateVIPException as e:
|
||||
raise exceptions.DriverError(user_fault_string=e.orig_msg,
|
||||
operator_fault_string=e.orig_msg)
|
||||
|
||||
LOG.info('Amphora provider created VIP port %s for load balancer %s.',
|
||||
vip.port_id, loadbalancer_id)
|
||||
return driver_utils.vip_dict_to_provider_dict(vip.to_dict())
|
||||
|
||||
# TODO(johnsom) convert this to octavia_lib constant flavor
|
||||
# once octavia is transitioned to use octavia_lib
|
||||
def loadbalancer_create(self, loadbalancer):
|
||||
if loadbalancer.flavor == driver_dm.Unset:
|
||||
loadbalancer.flavor = None
|
||||
payload = {consts.LOAD_BALANCER_ID: loadbalancer.loadbalancer_id,
|
||||
consts.FLAVOR: loadbalancer.flavor}
|
||||
self.client.cast({}, 'create_load_balancer', **payload)
|
||||
|
||||
def loadbalancer_delete(self, loadbalancer, cascade=False):
|
||||
loadbalancer_id = loadbalancer.loadbalancer_id
|
||||
payload = {consts.LOAD_BALANCER_ID: loadbalancer_id,
|
||||
'cascade': cascade}
|
||||
self.client.cast({}, 'delete_load_balancer', **payload)
|
||||
|
||||
def loadbalancer_failover(self, loadbalancer_id):
|
||||
payload = {consts.LOAD_BALANCER_ID: loadbalancer_id}
|
||||
self.client.cast({}, 'failover_load_balancer', **payload)
|
||||
|
||||
def loadbalancer_update(self, old_loadbalancer, new_loadbalancer):
|
||||
# Adapt the provider data model to the queue schema
|
||||
lb_dict = new_loadbalancer.to_dict()
|
||||
if 'admin_state_up' in lb_dict:
|
||||
lb_dict['enabled'] = lb_dict.pop('admin_state_up')
|
||||
lb_id = lb_dict.pop('loadbalancer_id')
|
||||
# Put the qos_policy_id back under the vip element the controller
|
||||
# expects
|
||||
vip_qos_policy_id = lb_dict.pop('vip_qos_policy_id', None)
|
||||
if vip_qos_policy_id:
|
||||
vip_dict = {"qos_policy_id": vip_qos_policy_id}
|
||||
lb_dict["vip"] = vip_dict
|
||||
|
||||
payload = {consts.LOAD_BALANCER_ID: lb_id,
|
||||
consts.LOAD_BALANCER_UPDATES: lb_dict}
|
||||
self.client.cast({}, 'update_load_balancer', **payload)
|
||||
|
||||
# Listener
|
||||
def listener_create(self, listener):
|
||||
payload = {consts.LISTENER_ID: listener.listener_id}
|
||||
self.client.cast({}, 'create_listener', **payload)
|
||||
|
||||
def listener_delete(self, listener):
|
||||
listener_id = listener.listener_id
|
||||
payload = {consts.LISTENER_ID: listener_id}
|
||||
self.client.cast({}, 'delete_listener', **payload)
|
||||
|
||||
def listener_update(self, old_listener, new_listener):
|
||||
listener_dict = new_listener.to_dict()
|
||||
if 'admin_state_up' in listener_dict:
|
||||
listener_dict['enabled'] = listener_dict.pop('admin_state_up')
|
||||
listener_id = listener_dict.pop('listener_id')
|
||||
if 'client_ca_tls_container_ref' in listener_dict:
|
||||
listener_dict['client_ca_tls_container_id'] = listener_dict.pop(
|
||||
'client_ca_tls_container_ref')
|
||||
listener_dict.pop('client_ca_tls_container_data', None)
|
||||
if 'client_crl_container_ref' in listener_dict:
|
||||
listener_dict['client_crl_container_id'] = listener_dict.pop(
|
||||
'client_crl_container_ref')
|
||||
listener_dict.pop('client_crl_container_data', None)
|
||||
|
||||
payload = {consts.LISTENER_ID: listener_id,
|
||||
consts.LISTENER_UPDATES: listener_dict}
|
||||
self.client.cast({}, 'update_listener', **payload)
|
||||
|
||||
# Pool
|
||||
def pool_create(self, pool):
|
||||
payload = {consts.POOL_ID: pool.pool_id}
|
||||
self.client.cast({}, 'create_pool', **payload)
|
||||
|
||||
def pool_delete(self, pool):
|
||||
pool_id = pool.pool_id
|
||||
payload = {consts.POOL_ID: pool_id}
|
||||
self.client.cast({}, 'delete_pool', **payload)
|
||||
|
||||
def pool_update(self, old_pool, new_pool):
|
||||
pool_dict = new_pool.to_dict()
|
||||
if 'admin_state_up' in pool_dict:
|
||||
pool_dict['enabled'] = pool_dict.pop('admin_state_up')
|
||||
pool_id = pool_dict.pop('pool_id')
|
||||
if 'tls_container_ref' in pool_dict:
|
||||
pool_dict['tls_container_id'] = pool_dict.pop('tls_container_ref')
|
||||
pool_dict.pop('tls_container_data', None)
|
||||
if 'ca_tls_container_ref' in pool_dict:
|
||||
pool_dict['ca_tls_certificate_id'] = pool_dict.pop(
|
||||
'ca_tls_container_ref')
|
||||
pool_dict.pop('ca_tls_container_data', None)
|
||||
if 'client_crl_container_ref' in pool_dict:
|
||||
pool_dict['client_crl_container_id'] = pool_dict.pop(
|
||||
'client_crl_container_ref')
|
||||
pool_dict.pop('client_crl_container_data', None)
|
||||
|
||||
payload = {consts.POOL_ID: pool_id,
|
||||
consts.POOL_UPDATES: pool_dict}
|
||||
self.client.cast({}, 'update_pool', **payload)
|
||||
|
||||
# Member
|
||||
def member_create(self, member):
|
||||
payload = {consts.MEMBER_ID: member.member_id}
|
||||
self.client.cast({}, 'create_member', **payload)
|
||||
|
||||
def member_delete(self, member):
|
||||
member_id = member.member_id
|
||||
payload = {consts.MEMBER_ID: member_id}
|
||||
self.client.cast({}, 'delete_member', **payload)
|
||||
|
||||
def member_update(self, old_member, new_member):
|
||||
member_dict = new_member.to_dict()
|
||||
if 'admin_state_up' in member_dict:
|
||||
member_dict['enabled'] = member_dict.pop('admin_state_up')
|
||||
member_id = member_dict.pop('member_id')
|
||||
|
||||
payload = {consts.MEMBER_ID: member_id,
|
||||
consts.MEMBER_UPDATES: member_dict}
|
||||
self.client.cast({}, 'update_member', **payload)
|
||||
|
||||
def member_batch_update(self, members):
|
||||
# Get a list of existing members
|
||||
pool_id = members[0].pool_id
|
||||
# The DB should not have updated yet, so we can still use the pool
|
||||
db_pool = self.repositories.pool.get(db_apis.get_session(), id=pool_id)
|
||||
old_members = db_pool.members
|
||||
|
||||
old_member_ids = [m.id for m in old_members]
|
||||
# The driver will always pass objects with IDs.
|
||||
new_member_ids = [m.member_id for m in members]
|
||||
|
||||
# Find members that are brand new or updated
|
||||
new_members = []
|
||||
updated_members = []
|
||||
for m in members:
|
||||
if m.member_id not in old_member_ids:
|
||||
new_members.append(m)
|
||||
else:
|
||||
member_dict = m.to_dict(render_unsets=False)
|
||||
member_dict['id'] = member_dict.pop('member_id')
|
||||
if 'address' in member_dict:
|
||||
member_dict['ip_address'] = member_dict.pop('address')
|
||||
if 'admin_state_up' in member_dict:
|
||||
member_dict['enabled'] = member_dict.pop('admin_state_up')
|
||||
updated_members.append(member_dict)
|
||||
|
||||
# Find members that are deleted
|
||||
deleted_members = []
|
||||
for m in old_members:
|
||||
if m.id not in new_member_ids:
|
||||
deleted_members.append(m)
|
||||
|
||||
payload = {'old_member_ids': [m.id for m in deleted_members],
|
||||
'new_member_ids': [m.member_id for m in new_members],
|
||||
'updated_members': updated_members}
|
||||
self.client.cast({}, 'batch_update_members', **payload)
|
||||
|
||||
# Health Monitor
|
||||
def health_monitor_create(self, healthmonitor):
|
||||
payload = {consts.HEALTH_MONITOR_ID: healthmonitor.healthmonitor_id}
|
||||
self.client.cast({}, 'create_health_monitor', **payload)
|
||||
|
||||
def health_monitor_delete(self, healthmonitor):
|
||||
healthmonitor_id = healthmonitor.healthmonitor_id
|
||||
payload = {consts.HEALTH_MONITOR_ID: healthmonitor_id}
|
||||
self.client.cast({}, 'delete_health_monitor', **payload)
|
||||
|
||||
def health_monitor_update(self, old_healthmonitor, new_healthmonitor):
|
||||
healthmon_dict = new_healthmonitor.to_dict()
|
||||
if 'admin_state_up' in healthmon_dict:
|
||||
healthmon_dict['enabled'] = healthmon_dict.pop('admin_state_up')
|
||||
if 'max_retries_down' in healthmon_dict:
|
||||
healthmon_dict['fall_threshold'] = healthmon_dict.pop(
|
||||
'max_retries_down')
|
||||
if 'max_retries' in healthmon_dict:
|
||||
healthmon_dict['rise_threshold'] = healthmon_dict.pop(
|
||||
'max_retries')
|
||||
healthmon_id = healthmon_dict.pop('healthmonitor_id')
|
||||
|
||||
payload = {consts.HEALTH_MONITOR_ID: healthmon_id,
|
||||
consts.HEALTH_MONITOR_UPDATES: healthmon_dict}
|
||||
self.client.cast({}, 'update_health_monitor', **payload)
|
||||
|
||||
# L7 Policy
|
||||
def l7policy_create(self, l7policy):
|
||||
payload = {consts.L7POLICY_ID: l7policy.l7policy_id}
|
||||
self.client.cast({}, 'create_l7policy', **payload)
|
||||
|
||||
def l7policy_delete(self, l7policy):
|
||||
l7policy_id = l7policy.l7policy_id
|
||||
payload = {consts.L7POLICY_ID: l7policy_id}
|
||||
self.client.cast({}, 'delete_l7policy', **payload)
|
||||
|
||||
def l7policy_update(self, old_l7policy, new_l7policy):
|
||||
l7policy_dict = new_l7policy.to_dict()
|
||||
if 'admin_state_up' in l7policy_dict:
|
||||
l7policy_dict['enabled'] = l7policy_dict.pop('admin_state_up')
|
||||
l7policy_id = l7policy_dict.pop('l7policy_id')
|
||||
|
||||
payload = {consts.L7POLICY_ID: l7policy_id,
|
||||
consts.L7POLICY_UPDATES: l7policy_dict}
|
||||
self.client.cast({}, 'update_l7policy', **payload)
|
||||
|
||||
# L7 Rule
|
||||
def l7rule_create(self, l7rule):
|
||||
payload = {consts.L7RULE_ID: l7rule.l7rule_id}
|
||||
self.client.cast({}, 'create_l7rule', **payload)
|
||||
|
||||
def l7rule_delete(self, l7rule):
|
||||
l7rule_id = l7rule.l7rule_id
|
||||
payload = {consts.L7RULE_ID: l7rule_id}
|
||||
self.client.cast({}, 'delete_l7rule', **payload)
|
||||
|
||||
def l7rule_update(self, old_l7rule, new_l7rule):
|
||||
l7rule_dict = new_l7rule.to_dict()
|
||||
if 'admin_state_up' in l7rule_dict:
|
||||
l7rule_dict['enabled'] = l7rule_dict.pop('admin_state_up')
|
||||
l7rule_id = l7rule_dict.pop('l7rule_id')
|
||||
|
||||
payload = {consts.L7RULE_ID: l7rule_id,
|
||||
consts.L7RULE_UPDATES: l7rule_dict}
|
||||
self.client.cast({}, 'update_l7rule', **payload)
|
||||
|
||||
# Flavor
|
||||
def get_supported_flavor_metadata(self):
|
||||
"""Returns the valid flavor metadata keys and descriptions.
|
||||
|
||||
This extracts the valid flavor metadata keys and descriptions
|
||||
from the JSON validation schema and returns it as a dictionary.
|
||||
|
||||
:return: Dictionary of flavor metadata keys and descriptions.
|
||||
:raises DriverError: An unexpected error occurred.
|
||||
"""
|
||||
try:
|
||||
props = flavor_schema.SUPPORTED_FLAVOR_SCHEMA['properties']
|
||||
return {k: v.get('description', '') for k, v in props.items()}
|
||||
except Exception as e:
|
||||
raise exceptions.DriverError(
|
||||
user_fault_string='Failed to get the supported flavor '
|
||||
'metadata due to: {}'.format(str(e)),
|
||||
operator_fault_string='Failed to get the supported flavor '
|
||||
'metadata due to: {}'.format(str(e)))
|
||||
|
||||
def validate_flavor(self, flavor_dict):
|
||||
"""Validates flavor profile data.
|
||||
|
||||
This will validate a flavor profile dataset against the flavor
|
||||
settings the amphora driver supports.
|
||||
|
||||
:param flavor_dict: The flavor dictionary to validate.
|
||||
:type flavor: dict
|
||||
:return: None
|
||||
:raises DriverError: An unexpected error occurred.
|
||||
:raises UnsupportedOptionError: If the driver does not support
|
||||
one of the flavor settings.
|
||||
"""
|
||||
try:
|
||||
validate(flavor_dict, flavor_schema.SUPPORTED_FLAVOR_SCHEMA)
|
||||
except js_exceptions.ValidationError as e:
|
||||
error_object = ''
|
||||
if e.relative_path:
|
||||
error_object = '{} '.format(e.relative_path[0])
|
||||
raise exceptions.UnsupportedOptionError(
|
||||
user_fault_string='{0}{1}'.format(error_object, e.message),
|
||||
operator_fault_string=str(e))
|
||||
except Exception as e:
|
||||
raise exceptions.DriverError(
|
||||
user_fault_string='Failed to validate the flavor metadata '
|
||||
'due to: {}'.format(str(e)),
|
||||
operator_fault_string='Failed to validate the flavor metadata '
|
||||
'due to: {}'.format(str(e)))
|
||||
compute_flavor = flavor_dict.get(consts.COMPUTE_FLAVOR, None)
|
||||
if compute_flavor:
|
||||
compute_driver = stevedore_driver.DriverManager(
|
||||
namespace='octavia.compute.drivers',
|
||||
name=CONF.controller_worker.compute_driver,
|
||||
invoke_on_load=True
|
||||
).driver
|
||||
|
||||
# TODO(johnsom) Fix this to raise a NotFound error
|
||||
# when the octavia-lib supports it.
|
||||
compute_driver.validate_flavor(compute_flavor)
|
@ -301,7 +301,9 @@ class ListenersController(base.BaseController):
|
||||
|
||||
# re-inject the sni container references lost due to SNI
|
||||
# being a separate table in the DB
|
||||
provider_listener.sni_container_refs = listener.sni_container_refs
|
||||
if listener.sni_container_refs != wtypes.Unset:
|
||||
provider_listener.sni_container_refs = (
|
||||
listener.sni_container_refs)
|
||||
|
||||
# Dispatch to the driver
|
||||
LOG.info("Sending create Listener %s to provider %s",
|
||||
|
@ -20,7 +20,8 @@ from oslo_config import cfg
|
||||
from oslo_reports import guru_meditation_report as gmr
|
||||
|
||||
from octavia.common import service as octavia_service
|
||||
from octavia.controller.queue import consumer
|
||||
from octavia.controller.queue.v1 import consumer as consumer_v1
|
||||
from octavia.controller.queue.v2 import consumer as consumer_v2
|
||||
from octavia import version
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -32,7 +33,9 @@ def main():
|
||||
gmr.TextGuruMeditation.setup_autorun(version)
|
||||
|
||||
sm = cotyledon.ServiceManager()
|
||||
sm.add(consumer.ConsumerService, workers=CONF.controller_worker.workers,
|
||||
sm.add(consumer_v1.ConsumerService, workers=CONF.controller_worker.workers,
|
||||
args=(CONF,))
|
||||
sm.add(consumer_v2.ConsumerService,
|
||||
workers=CONF.controller_worker.workers, args=(CONF,))
|
||||
oslo_config_glue.setup(sm, CONF, reload_method="mutate")
|
||||
sm.run()
|
||||
|
@ -656,3 +656,5 @@ CLIENT_AUTH_OPTIONAL = 'OPTIONAL'
|
||||
CLIENT_AUTH_MANDATORY = 'MANDATORY'
|
||||
SUPPORTED_CLIENT_AUTH_MODES = [CLIENT_AUTH_NONE, CLIENT_AUTH_OPTIONAL,
|
||||
CLIENT_AUTH_MANDATORY]
|
||||
|
||||
TOPIC_AMPHORA_V2 = 'octavia_provisioning_v2'
|
||||
|
@ -23,7 +23,7 @@ from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker import controller_worker as cw
|
||||
from octavia.controller.worker.v1 import controller_worker as cw
|
||||
from octavia.db import api as db_api
|
||||
from octavia.db import repositories as repo
|
||||
|
||||
|
@ -20,7 +20,7 @@ from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
from sqlalchemy.orm import exc as sqlalchemy_exceptions
|
||||
|
||||
from octavia.controller.worker import controller_worker as cw
|
||||
from octavia.controller.worker.v1 import controller_worker as cw
|
||||
from octavia.db import api as db_api
|
||||
from octavia.db import repositories as repo
|
||||
|
||||
|
@ -18,7 +18,7 @@ import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
|
||||
from octavia.common import rpc
|
||||
from octavia.controller.queue import endpoint
|
||||
from octavia.controller.queue.v1 import endpoints
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -38,7 +38,7 @@ class ConsumerService(cotyledon.Service):
|
||||
LOG.info('Starting consumer...')
|
||||
target = messaging.Target(topic=self.topic, server=self.server,
|
||||
fanout=False)
|
||||
self.endpoints = [endpoint.Endpoint()]
|
||||
self.endpoints = [endpoints.Endpoints()]
|
||||
self.message_listener = rpc.get_server(
|
||||
target, self.endpoints,
|
||||
executor='threading',
|
@ -24,7 +24,7 @@ CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Endpoint(object):
|
||||
class Endpoints(object):
|
||||
|
||||
# API version history:
|
||||
# 1.0 - Initial version.
|
65
octavia/controller/queue/v2/consumer.py
Normal file
65
octavia/controller/queue/v2/consumer.py
Normal file
@ -0,0 +1,65 @@
|
||||
# Copyright 2014 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import cotyledon
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.common import rpc
|
||||
from octavia.controller.queue.v2 import endpoints
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConsumerService(cotyledon.Service):
|
||||
|
||||
def __init__(self, worker_id, conf):
|
||||
super(ConsumerService, self).__init__(worker_id)
|
||||
self.conf = conf
|
||||
self.topic = constants.TOPIC_AMPHORA_V2
|
||||
self.server = conf.host
|
||||
self.endpoints = []
|
||||
self.access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
self.message_listener = None
|
||||
|
||||
def run(self):
|
||||
LOG.info('Starting V2 consumer...')
|
||||
target = messaging.Target(topic=self.topic, server=self.server,
|
||||
fanout=False)
|
||||
self.endpoints = [endpoints.Endpoints()]
|
||||
self.message_listener = rpc.get_server(
|
||||
target, self.endpoints,
|
||||
executor='threading',
|
||||
access_policy=self.access_policy
|
||||
)
|
||||
self.message_listener.start()
|
||||
|
||||
def terminate(self, graceful=False):
|
||||
if self.message_listener:
|
||||
LOG.info('Stopping V2 consumer...')
|
||||
self.message_listener.stop()
|
||||
if graceful:
|
||||
LOG.info('V2 Consumer successfully stopped. Waiting for '
|
||||
'final messages to be processed...')
|
||||
self.message_listener.wait()
|
||||
if self.endpoints:
|
||||
LOG.info('Shutting down V2 endpoint worker executors...')
|
||||
for e in self.endpoints:
|
||||
try:
|
||||
e.worker.executor.shutdown()
|
||||
except AttributeError:
|
||||
pass
|
||||
super(ConsumerService, self).terminate()
|
156
octavia/controller/queue/v2/endpoints.py
Normal file
156
octavia/controller/queue/v2/endpoints.py
Normal file
@ -0,0 +1,156 @@
|
||||
# Copyright 2014 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from stevedore import driver as stevedore_driver
|
||||
|
||||
from octavia.common import constants
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Endpoints(object):
|
||||
|
||||
# API version history:
|
||||
# 1.0 - Initial version.
|
||||
# 2.0 - Provider driver format
|
||||
target = messaging.Target(
|
||||
namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT,
|
||||
version='2.0')
|
||||
|
||||
def __init__(self):
|
||||
self.worker = stevedore_driver.DriverManager(
|
||||
namespace='octavia.plugins',
|
||||
name=CONF.octavia_plugins,
|
||||
invoke_on_load=True
|
||||
).driver
|
||||
|
||||
def create_load_balancer(self, context, load_balancer_id,
|
||||
flavor=None):
|
||||
LOG.info('Creating load balancer \'%s\'...', load_balancer_id)
|
||||
self.worker.create_load_balancer(load_balancer_id, flavor)
|
||||
|
||||
def update_load_balancer(self, context, load_balancer_id,
|
||||
load_balancer_updates):
|
||||
LOG.info('Updating load balancer \'%s\'...', load_balancer_id)
|
||||
self.worker.update_load_balancer(load_balancer_id,
|
||||
load_balancer_updates)
|
||||
|
||||
def delete_load_balancer(self, context, load_balancer_id, cascade=False):
|
||||
LOG.info('Deleting load balancer \'%s\'...', load_balancer_id)
|
||||
self.worker.delete_load_balancer(load_balancer_id, cascade)
|
||||
|
||||
def failover_load_balancer(self, context, load_balancer_id):
|
||||
LOG.info('Failing over amphora in load balancer \'%s\'...',
|
||||
load_balancer_id)
|
||||
self.worker.failover_loadbalancer(load_balancer_id)
|
||||
|
||||
def failover_amphora(self, context, amphora_id):
|
||||
LOG.info('Failing over amphora \'%s\'...',
|
||||
amphora_id)
|
||||
self.worker.failover_amphora(amphora_id)
|
||||
|
||||
def create_listener(self, context, listener_id):
|
||||
LOG.info('Creating listener \'%s\'...', listener_id)
|
||||
self.worker.create_listener(listener_id)
|
||||
|
||||
def update_listener(self, context, listener_id, listener_updates):
|
||||
LOG.info('Updating listener \'%s\'...', listener_id)
|
||||
self.worker.update_listener(listener_id, listener_updates)
|
||||
|
||||
def delete_listener(self, context, listener_id):
|
||||
LOG.info('Deleting listener \'%s\'...', listener_id)
|
||||
self.worker.delete_listener(listener_id)
|
||||
|
||||
def create_pool(self, context, pool_id):
|
||||
LOG.info('Creating pool \'%s\'...', pool_id)
|
||||
self.worker.create_pool(pool_id)
|
||||
|
||||
def update_pool(self, context, pool_id, pool_updates):
|
||||
LOG.info('Updating pool \'%s\'...', pool_id)
|
||||
self.worker.update_pool(pool_id, pool_updates)
|
||||
|
||||
def delete_pool(self, context, pool_id):
|
||||
LOG.info('Deleting pool \'%s\'...', pool_id)
|
||||
self.worker.delete_pool(pool_id)
|
||||
|
||||
def create_health_monitor(self, context, health_monitor_id):
|
||||
LOG.info('Creating health monitor \'%s\'...', health_monitor_id)
|
||||
self.worker.create_health_monitor(health_monitor_id)
|
||||
|
||||
def update_health_monitor(self, context, health_monitor_id,
|
||||
health_monitor_updates):
|
||||
LOG.info('Updating health monitor \'%s\'...', health_monitor_id)
|
||||
self.worker.update_health_monitor(health_monitor_id,
|
||||
health_monitor_updates)
|
||||
|
||||
def delete_health_monitor(self, context, health_monitor_id):
|
||||
LOG.info('Deleting health monitor \'%s\'...', health_monitor_id)
|
||||
self.worker.delete_health_monitor(health_monitor_id)
|
||||
|
||||
def create_member(self, context, member_id):
|
||||
LOG.info('Creating member \'%s\'...', member_id)
|
||||
self.worker.create_member(member_id)
|
||||
|
||||
def update_member(self, context, member_id, member_updates):
|
||||
LOG.info('Updating member \'%s\'...', member_id)
|
||||
self.worker.update_member(member_id, member_updates)
|
||||
|
||||
def batch_update_members(self, context, old_member_ids, new_member_ids,
|
||||
updated_members):
|
||||
updated_member_ids = [m.get('id') for m in updated_members]
|
||||
LOG.info(
|
||||
'Batch updating members: old=\'%(old)s\', new=\'%(new)s\', '
|
||||
'updated=\'%(updated)s\'...',
|
||||
{'old': old_member_ids, 'new': new_member_ids,
|
||||
'updated': updated_member_ids})
|
||||
self.worker.batch_update_members(
|
||||
old_member_ids, new_member_ids, updated_members)
|
||||
|
||||
def delete_member(self, context, member_id):
|
||||
LOG.info('Deleting member \'%s\'...', member_id)
|
||||
self.worker.delete_member(member_id)
|
||||
|
||||
def create_l7policy(self, context, l7policy_id):
|
||||
LOG.info('Creating l7policy \'%s\'...', l7policy_id)
|
||||
self.worker.create_l7policy(l7policy_id)
|
||||
|
||||
def update_l7policy(self, context, l7policy_id, l7policy_updates):
|
||||
LOG.info('Updating l7policy \'%s\'...', l7policy_id)
|
||||
self.worker.update_l7policy(l7policy_id, l7policy_updates)
|
||||
|
||||
def delete_l7policy(self, context, l7policy_id):
|
||||
LOG.info('Deleting l7policy \'%s\'...', l7policy_id)
|
||||
self.worker.delete_l7policy(l7policy_id)
|
||||
|
||||
def create_l7rule(self, context, l7rule_id):
|
||||
LOG.info('Creating l7rule \'%s\'...', l7rule_id)
|
||||
self.worker.create_l7rule(l7rule_id)
|
||||
|
||||
def update_l7rule(self, context, l7rule_id, l7rule_updates):
|
||||
LOG.info('Updating l7rule \'%s\'...', l7rule_id)
|
||||
self.worker.update_l7rule(l7rule_id, l7rule_updates)
|
||||
|
||||
def delete_l7rule(self, context, l7rule_id):
|
||||
LOG.info('Deleting l7rule \'%s\'...', l7rule_id)
|
||||
self.worker.delete_l7rule(l7rule_id)
|
||||
|
||||
def update_amphora_agent_config(self, context, amphora_id):
|
||||
LOG.info('Updating amphora \'%s\' agent configuration...',
|
||||
amphora_id)
|
||||
self.worker.update_amphora_agent_config(amphora_id)
|
11
octavia/controller/worker/v1/__init__.py
Normal file
11
octavia/controller/worker/v1/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
@ -23,14 +23,14 @@ import tenacity
|
||||
|
||||
from octavia.common import base_taskflow
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.flows import amphora_flows
|
||||
from octavia.controller.worker.flows import health_monitor_flows
|
||||
from octavia.controller.worker.flows import l7policy_flows
|
||||
from octavia.controller.worker.flows import l7rule_flows
|
||||
from octavia.controller.worker.flows import listener_flows
|
||||
from octavia.controller.worker.flows import load_balancer_flows
|
||||
from octavia.controller.worker.flows import member_flows
|
||||
from octavia.controller.worker.flows import pool_flows
|
||||
from octavia.controller.worker.v1.flows import amphora_flows
|
||||
from octavia.controller.worker.v1.flows import health_monitor_flows
|
||||
from octavia.controller.worker.v1.flows import l7policy_flows
|
||||
from octavia.controller.worker.v1.flows import l7rule_flows
|
||||
from octavia.controller.worker.v1.flows import listener_flows
|
||||
from octavia.controller.worker.v1.flows import load_balancer_flows
|
||||
from octavia.controller.worker.v1.flows import member_flows
|
||||
from octavia.controller.worker.v1.flows import pool_flows
|
||||
from octavia.db import api as db_apis
|
||||
from octavia.db import repositories as repo
|
||||
|
11
octavia/controller/worker/v1/flows/__init__.py
Normal file
11
octavia/controller/worker/v1/flows/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
@ -19,12 +19,12 @@ from taskflow.patterns import linear_flow
|
||||
from taskflow.patterns import unordered_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.tasks import cert_task
|
||||
from octavia.controller.worker.tasks import compute_tasks
|
||||
from octavia.controller.worker.tasks import database_tasks
|
||||
from octavia.controller.worker.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.tasks import network_tasks
|
||||
from octavia.controller.worker.v1.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.v1.tasks import cert_task
|
||||
from octavia.controller.worker.v1.tasks import compute_tasks
|
||||
from octavia.controller.worker.v1.tasks import database_tasks
|
||||
from octavia.controller.worker.v1.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.v1.tasks import network_tasks
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -16,10 +16,10 @@
|
||||
from taskflow.patterns import linear_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.tasks import database_tasks
|
||||
from octavia.controller.worker.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.tasks import model_tasks
|
||||
from octavia.controller.worker.v1.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.v1.tasks import database_tasks
|
||||
from octavia.controller.worker.v1.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.v1.tasks import model_tasks
|
||||
|
||||
|
||||
class HealthMonitorFlows(object):
|
@ -16,10 +16,10 @@
|
||||
from taskflow.patterns import linear_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.tasks import database_tasks
|
||||
from octavia.controller.worker.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.tasks import model_tasks
|
||||
from octavia.controller.worker.v1.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.v1.tasks import database_tasks
|
||||
from octavia.controller.worker.v1.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.v1.tasks import model_tasks
|
||||
|
||||
|
||||
class L7PolicyFlows(object):
|
@ -16,10 +16,10 @@
|
||||
from taskflow.patterns import linear_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.tasks import database_tasks
|
||||
from octavia.controller.worker.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.tasks import model_tasks
|
||||
from octavia.controller.worker.v1.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.v1.tasks import database_tasks
|
||||
from octavia.controller.worker.v1.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.v1.tasks import model_tasks
|
||||
|
||||
|
||||
class L7RuleFlows(object):
|
@ -16,10 +16,10 @@
|
||||
from taskflow.patterns import linear_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.tasks import database_tasks
|
||||
from octavia.controller.worker.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.tasks import network_tasks
|
||||
from octavia.controller.worker.v1.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.v1.tasks import database_tasks
|
||||
from octavia.controller.worker.v1.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.v1.tasks import network_tasks
|
||||
|
||||
|
||||
class ListenerFlows(object):
|
@ -20,15 +20,15 @@ from taskflow.patterns import unordered_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.common import exceptions
|
||||
from octavia.controller.worker.flows import amphora_flows
|
||||
from octavia.controller.worker.flows import listener_flows
|
||||
from octavia.controller.worker.flows import member_flows
|
||||
from octavia.controller.worker.flows import pool_flows
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.tasks import compute_tasks
|
||||
from octavia.controller.worker.tasks import database_tasks
|
||||
from octavia.controller.worker.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.tasks import network_tasks
|
||||
from octavia.controller.worker.v1.flows import amphora_flows
|
||||
from octavia.controller.worker.v1.flows import listener_flows
|
||||
from octavia.controller.worker.v1.flows import member_flows
|
||||
from octavia.controller.worker.v1.flows import pool_flows
|
||||
from octavia.controller.worker.v1.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.v1.tasks import compute_tasks
|
||||
from octavia.controller.worker.v1.tasks import database_tasks
|
||||
from octavia.controller.worker.v1.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.v1.tasks import network_tasks
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
@ -17,11 +17,11 @@ from taskflow.patterns import linear_flow
|
||||
from taskflow.patterns import unordered_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.tasks import database_tasks
|
||||
from octavia.controller.worker.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.tasks import model_tasks
|
||||
from octavia.controller.worker.tasks import network_tasks
|
||||
from octavia.controller.worker.v1.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.v1.tasks import database_tasks
|
||||
from octavia.controller.worker.v1.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.v1.tasks import model_tasks
|
||||
from octavia.controller.worker.v1.tasks import network_tasks
|
||||
|
||||
|
||||
class MemberFlows(object):
|
@ -16,10 +16,10 @@
|
||||
from taskflow.patterns import linear_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.tasks import database_tasks
|
||||
from octavia.controller.worker.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.tasks import model_tasks
|
||||
from octavia.controller.worker.v1.tasks import amphora_driver_tasks
|
||||
from octavia.controller.worker.v1.tasks import database_tasks
|
||||
from octavia.controller.worker.v1.tasks import lifecycle_tasks
|
||||
from octavia.controller.worker.v1.tasks import model_tasks
|
||||
|
||||
|
||||
class PoolFlows(object):
|
11
octavia/controller/worker/v1/tasks/__init__.py
Normal file
11
octavia/controller/worker/v1/tasks/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
11
octavia/controller/worker/v2/__init__.py
Normal file
11
octavia/controller/worker/v2/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
964
octavia/controller/worker/v2/controller_worker.py
Normal file
964
octavia/controller/worker/v2/controller_worker.py
Normal file
@ -0,0 +1,964 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from sqlalchemy.orm import exc as db_exceptions
|
||||
from taskflow.listeners import logging as tf_logging
|
||||
import tenacity
|
||||
|
||||
from octavia.common import base_taskflow
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.v2.flows import amphora_flows
|
||||
from octavia.controller.worker.v2.flows import health_monitor_flows
|
||||
from octavia.controller.worker.v2.flows import l7policy_flows
|
||||
from octavia.controller.worker.v2.flows import l7rule_flows
|
||||
from octavia.controller.worker.v2.flows import listener_flows
|
||||
from octavia.controller.worker.v2.flows import load_balancer_flows
|
||||
from octavia.controller.worker.v2.flows import member_flows
|
||||
from octavia.controller.worker.v2.flows import pool_flows
|
||||
from octavia.db import api as db_apis
|
||||
from octavia.db import repositories as repo
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
RETRY_ATTEMPTS = 15
|
||||
RETRY_INITIAL_DELAY = 1
|
||||
RETRY_BACKOFF = 1
|
||||
RETRY_MAX = 5
|
||||
|
||||
|
||||
def _is_provisioning_status_pending_update(lb_obj):
|
||||
return not lb_obj.provisioning_status == constants.PENDING_UPDATE
|
||||
|
||||
|
||||
class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
|
||||
def __init__(self):
|
||||
|
||||
self._amphora_flows = amphora_flows.AmphoraFlows()
|
||||
self._health_monitor_flows = health_monitor_flows.HealthMonitorFlows()
|
||||
self._lb_flows = load_balancer_flows.LoadBalancerFlows()
|
||||
self._listener_flows = listener_flows.ListenerFlows()
|
||||
self._member_flows = member_flows.MemberFlows()
|
||||
self._pool_flows = pool_flows.PoolFlows()
|
||||
self._l7policy_flows = l7policy_flows.L7PolicyFlows()
|
||||
self._l7rule_flows = l7rule_flows.L7RuleFlows()
|
||||
|
||||
self._amphora_repo = repo.AmphoraRepository()
|
||||
self._amphora_health_repo = repo.AmphoraHealthRepository()
|
||||
self._health_mon_repo = repo.HealthMonitorRepository()
|
||||
self._lb_repo = repo.LoadBalancerRepository()
|
||||
self._listener_repo = repo.ListenerRepository()
|
||||
self._member_repo = repo.MemberRepository()
|
||||
self._pool_repo = repo.PoolRepository()
|
||||
self._l7policy_repo = repo.L7PolicyRepository()
|
||||
self._l7rule_repo = repo.L7RuleRepository()
|
||||
self._flavor_repo = repo.FlavorRepository()
|
||||
|
||||
super(ControllerWorker, self).__init__()
|
||||
|
||||
@tenacity.retry(
|
||||
retry=(
|
||||
tenacity.retry_if_result(_is_provisioning_status_pending_update) |
|
||||
tenacity.retry_if_exception_type()),
|
||||
wait=tenacity.wait_incrementing(
|
||||
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
|
||||
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
|
||||
def _get_db_obj_until_pending_update(self, repo, id):
|
||||
|
||||
return repo.get(db_apis.get_session(), id=id)
|
||||
|
||||
def create_amphora(self):
|
||||
"""Creates an Amphora.
|
||||
|
||||
This is used to create spare amphora.
|
||||
|
||||
:returns: amphora_id
|
||||
"""
|
||||
try:
|
||||
create_amp_tf = self._taskflow_load(
|
||||
self._amphora_flows.get_create_amphora_flow(),
|
||||
store={constants.BUILD_TYPE_PRIORITY:
|
||||
constants.LB_CREATE_SPARES_POOL_PRIORITY,
|
||||
constants.FLAVOR: None}
|
||||
)
|
||||
with tf_logging.DynamicLoggingListener(create_amp_tf, log=LOG):
|
||||
create_amp_tf.run()
|
||||
|
||||
return create_amp_tf.storage.fetch('amphora')
|
||||
except Exception as e:
|
||||
LOG.error('Failed to create an amphora due to: {}'.format(str(e)))
|
||||
|
||||
def delete_amphora(self, amphora_id):
|
||||
"""Deletes an existing Amphora.
|
||||
|
||||
:param amphora_id: ID of the amphora to delete
|
||||
:returns: None
|
||||
:raises AmphoraNotFound: The referenced Amphora was not found
|
||||
"""
|
||||
amphora = self._amphora_repo.get(db_apis.get_session(),
|
||||
id=amphora_id)
|
||||
delete_amp_tf = self._taskflow_load(self._amphora_flows.
|
||||
get_delete_amphora_flow(),
|
||||
store={constants.AMPHORA: amphora})
|
||||
with tf_logging.DynamicLoggingListener(delete_amp_tf,
|
||||
log=LOG):
|
||||
delete_amp_tf.run()
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
||||
wait=tenacity.wait_incrementing(
|
||||
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
|
||||
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
|
||||
def create_health_monitor(self, health_monitor_id):
|
||||
"""Creates a health monitor.
|
||||
|
||||
:param pool_id: ID of the pool to create a health monitor on
|
||||
:returns: None
|
||||
:raises NoResultFound: Unable to find the object
|
||||
"""
|
||||
health_mon = self._health_mon_repo.get(db_apis.get_session(),
|
||||
id=health_monitor_id)
|
||||
if not health_mon:
|
||||
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
||||
'60 seconds.', 'health_monitor', health_monitor_id)
|
||||
raise db_exceptions.NoResultFound
|
||||
|
||||
pool = health_mon.pool
|
||||
listeners = pool.listeners
|
||||
pool.health_monitor = health_mon
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
create_hm_tf = self._taskflow_load(
|
||||
self._health_monitor_flows.get_create_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
constants.POOL: pool,
|
||||
constants.LISTENERS: listeners,
|
||||
constants.LOADBALANCER: load_balancer})
|
||||
with tf_logging.DynamicLoggingListener(create_hm_tf,
|
||||
log=LOG):
|
||||
create_hm_tf.run()
|
||||
|
||||
def delete_health_monitor(self, health_monitor_id):
|
||||
"""Deletes a health monitor.
|
||||
|
||||
:param pool_id: ID of the pool to delete its health monitor
|
||||
:returns: None
|
||||
:raises HMNotFound: The referenced health monitor was not found
|
||||
"""
|
||||
health_mon = self._health_mon_repo.get(db_apis.get_session(),
|
||||
id=health_monitor_id)
|
||||
|
||||
pool = health_mon.pool
|
||||
listeners = pool.listeners
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
delete_hm_tf = self._taskflow_load(
|
||||
self._health_monitor_flows.get_delete_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
constants.POOL: pool,
|
||||
constants.LISTENERS: listeners,
|
||||
constants.LOADBALANCER: load_balancer})
|
||||
with tf_logging.DynamicLoggingListener(delete_hm_tf,
|
||||
log=LOG):
|
||||
delete_hm_tf.run()
|
||||
|
||||
def update_health_monitor(self, health_monitor_id, health_monitor_updates):
|
||||
"""Updates a health monitor.
|
||||
|
||||
:param pool_id: ID of the pool to have it's health monitor updated
|
||||
:param health_monitor_updates: Dict containing updated health monitor
|
||||
:returns: None
|
||||
:raises HMNotFound: The referenced health monitor was not found
|
||||
"""
|
||||
health_mon = None
|
||||
try:
|
||||
health_mon = self._get_db_obj_until_pending_update(
|
||||
self._health_mon_repo, health_monitor_id)
|
||||
except tenacity.RetryError as e:
|
||||
LOG.warning('Health monitor did not go into %s in 60 seconds. '
|
||||
'This either due to an in-progress Octavia upgrade '
|
||||
'or an overloaded and failing database. Assuming '
|
||||
'an upgrade is in progress and continuing.',
|
||||
constants.PENDING_UPDATE)
|
||||
health_mon = e.last_attempt.result()
|
||||
|
||||
pool = health_mon.pool
|
||||
listeners = pool.listeners
|
||||
pool.health_monitor = health_mon
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
update_hm_tf = self._taskflow_load(
|
||||
self._health_monitor_flows.get_update_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
constants.POOL: pool,
|
||||
constants.LISTENERS: listeners,
|
||||
constants.LOADBALANCER: load_balancer,
|
||||
constants.UPDATE_DICT: health_monitor_updates})
|
||||
with tf_logging.DynamicLoggingListener(update_hm_tf,
|
||||
log=LOG):
|
||||
update_hm_tf.run()
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
||||
wait=tenacity.wait_incrementing(
|
||||
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
|
||||
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
|
||||
def create_listener(self, listener_id):
|
||||
"""Creates a listener.
|
||||
|
||||
:param listener_id: ID of the listener to create
|
||||
:returns: None
|
||||
:raises NoResultFound: Unable to find the object
|
||||
"""
|
||||
listener = self._listener_repo.get(db_apis.get_session(),
|
||||
id=listener_id)
|
||||
if not listener:
|
||||
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
||||
'60 seconds.', 'listener', listener_id)
|
||||
raise db_exceptions.NoResultFound
|
||||
|
||||
load_balancer = listener.load_balancer
|
||||
|
||||
create_listener_tf = self._taskflow_load(self._listener_flows.
|
||||
get_create_listener_flow(),
|
||||
store={constants.LOADBALANCER:
|
||||
load_balancer,
|
||||
constants.LISTENERS:
|
||||
[listener]})
|
||||
with tf_logging.DynamicLoggingListener(create_listener_tf,
|
||||
log=LOG):
|
||||
create_listener_tf.run()
|
||||
|
||||
def delete_listener(self, listener_id):
|
||||
"""Deletes a listener.
|
||||
|
||||
:param listener_id: ID of the listener to delete
|
||||
:returns: None
|
||||
:raises ListenerNotFound: The referenced listener was not found
|
||||
"""
|
||||
listener = self._listener_repo.get(db_apis.get_session(),
|
||||
id=listener_id)
|
||||
load_balancer = listener.load_balancer
|
||||
|
||||
delete_listener_tf = self._taskflow_load(
|
||||
self._listener_flows.get_delete_listener_flow(),
|
||||
store={constants.LOADBALANCER: load_balancer,
|
||||
constants.LISTENER: listener})
|
||||
with tf_logging.DynamicLoggingListener(delete_listener_tf,
|
||||
log=LOG):
|
||||
delete_listener_tf.run()
|
||||
|
||||
def update_listener(self, listener_id, listener_updates):
|
||||
"""Updates a listener.
|
||||
|
||||
:param listener_id: ID of the listener to update
|
||||
:param listener_updates: Dict containing updated listener attributes
|
||||
:returns: None
|
||||
:raises ListenerNotFound: The referenced listener was not found
|
||||
"""
|
||||
listener = None
|
||||
try:
|
||||
listener = self._get_db_obj_until_pending_update(
|
||||
self._listener_repo, listener_id)
|
||||
except tenacity.RetryError as e:
|
||||
LOG.warning('Listener did not go into %s in 60 seconds. '
|
||||
'This either due to an in-progress Octavia upgrade '
|
||||
'or an overloaded and failing database. Assuming '
|
||||
'an upgrade is in progress and continuing.',
|
||||
constants.PENDING_UPDATE)
|
||||
listener = e.last_attempt.result()
|
||||
|
||||
load_balancer = listener.load_balancer
|
||||
|
||||
update_listener_tf = self._taskflow_load(self._listener_flows.
|
||||
get_update_listener_flow(),
|
||||
store={constants.LISTENER:
|
||||
listener,
|
||||
constants.LOADBALANCER:
|
||||
load_balancer,
|
||||
constants.UPDATE_DICT:
|
||||
listener_updates,
|
||||
constants.LISTENERS:
|
||||
[listener]})
|
||||
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
|
||||
update_listener_tf.run()
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
||||
wait=tenacity.wait_incrementing(
|
||||
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
|
||||
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
|
||||
def create_load_balancer(self, load_balancer_id, flavor=None):
|
||||
"""Creates a load balancer by allocating Amphorae.
|
||||
|
||||
First tries to allocate an existing Amphora in READY state.
|
||||
If none are available it will attempt to build one specifically
|
||||
for this load balancer.
|
||||
|
||||
:param load_balancer_id: ID of the load balancer to create
|
||||
:returns: None
|
||||
:raises NoResultFound: Unable to find the object
|
||||
"""
|
||||
lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id)
|
||||
if not lb:
|
||||
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
||||
'60 seconds.', 'load_balancer', load_balancer_id)
|
||||
raise db_exceptions.NoResultFound
|
||||
|
||||
# TODO(johnsom) convert this to octavia_lib constant flavor
|
||||
# once octavia is transitioned to use octavia_lib
|
||||
store = {constants.LOADBALANCER_ID: load_balancer_id,
|
||||
constants.BUILD_TYPE_PRIORITY:
|
||||
constants.LB_CREATE_NORMAL_PRIORITY,
|
||||
constants.FLAVOR: flavor}
|
||||
|
||||
topology = lb.topology
|
||||
|
||||
store[constants.UPDATE_DICT] = {
|
||||
constants.TOPOLOGY: topology
|
||||
}
|
||||
|
||||
create_lb_flow = self._lb_flows.get_create_load_balancer_flow(
|
||||
topology=topology, listeners=lb.listeners)
|
||||
|
||||
create_lb_tf = self._taskflow_load(create_lb_flow, store=store)
|
||||
with tf_logging.DynamicLoggingListener(create_lb_tf, log=LOG):
|
||||
create_lb_tf.run()
|
||||
|
||||
def delete_load_balancer(self, load_balancer_id, cascade=False):
|
||||
"""Deletes a load balancer by de-allocating Amphorae.
|
||||
|
||||
:param load_balancer_id: ID of the load balancer to delete
|
||||
:returns: None
|
||||
:raises LBNotFound: The referenced load balancer was not found
|
||||
"""
|
||||
lb = self._lb_repo.get(db_apis.get_session(),
|
||||
id=load_balancer_id)
|
||||
|
||||
if cascade:
|
||||
(flow,
|
||||
store) = self._lb_flows.get_cascade_delete_load_balancer_flow(lb)
|
||||
else:
|
||||
(flow, store) = self._lb_flows.get_delete_load_balancer_flow(lb)
|
||||
store.update({constants.LOADBALANCER: lb,
|
||||
constants.SERVER_GROUP_ID: lb.server_group_id})
|
||||
delete_lb_tf = self._taskflow_load(flow, store=store)
|
||||
|
||||
with tf_logging.DynamicLoggingListener(delete_lb_tf,
|
||||
log=LOG):
|
||||
delete_lb_tf.run()
|
||||
|
||||
def update_load_balancer(self, load_balancer_id, load_balancer_updates):
|
||||
"""Updates a load balancer.
|
||||
|
||||
:param load_balancer_id: ID of the load balancer to update
|
||||
:param load_balancer_updates: |