Implements Octavia Controller Worker

Co-Authored-By: Aishwarya Thangappa <aishwarya.thangappa@gmail.com>
Co-Authored-By: German Eichberger <german.eichberger@hp.com>
Implements: blueprint controller-worker
Change-Id: If44a70d6ada43673d827987081e7c760598523bd
changes/96/151496/37
Michael Johnson 8 years ago
parent fac74ad2d6
commit 424c320b01

@ -7,6 +7,8 @@
# bind_port = 9876
# api_handler = simulated_handler
# nova_region_name =
[database]
# This line MUST be changed to actually run the plugin.
# Example:
@ -57,3 +59,32 @@
# base_log_dir = /logs
# connection_max_retries = 10
# connection_retry_threshold = 5
[controller_worker]
# amp_active_wait_sec = 10
# Nova parameters to use when booting amphora
# amp_flavor_id =
# amp_image_id =
# amp_ssh_key =
# amp_network =
# amp_secgroup_list =
# Amphora driver options are amphora_noop_driver,
# amphora_haproxy_rest_driver,
# amphora_haproxy_ssh_driver
#
# amphora_driver = amphora_noop_driver
#
# Compute driver options are compute_noop_driver
# compute_nova_driver
#
# compute_driver = compute_noop_driver
#
# Network driver options are network_noop_driver
# allowed_address_pairs_driver
#
# network_driver = network_noop_driver
[task_flow]
# engine = serial
# max_workers = 5

@ -0,0 +1,45 @@
# Copyright 2014-2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import concurrent.futures
from oslo.config import cfg
from taskflow import engines as tf_engines
CONF = cfg.CONF
CONF.import_group('task_flow', 'octavia.common.config')
class BaseTaskFlowEngine(object):
"""This is the task flow engine
Use this engine to start/load flows in the
code
"""
def __init__(self):
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=CONF.task_flow.max_workers)
def _taskflow_load(self, flow, **kwargs):
eng = tf_engines.load(
flow,
engine_conf=CONF.task_flow.engine,
executor=self.executor,
**kwargs)
eng.compile()
eng.prepare()
return eng

@ -104,6 +104,48 @@ haproxy_amphora_opts = [
'attempts.'))
]
controller_worker_opts = [
cfg.IntOpt('amp_active_retries',
default=10,
help=_('Retry attempts to wait for Amphora to become active')),
cfg.IntOpt('amp_active_wait_sec',
default=10,
help=_('Seconds to wait for an Amphora to become active')),
cfg.StrOpt('amp_flavor_id',
default='',
help=_('Nova instance flavor id for the Amphora')),
cfg.StrOpt('amp_image_id',
default='',
help=_('Glance image id for the Amphora image to boot')),
cfg.StrOpt('amp_ssh_key',
default='',
help=_('SSH key to load into the Amphora')),
cfg.StrOpt('amp_network',
default='',
help=_('Network to attach to the Amphora')),
cfg.ListOpt('amp_secgroup_list',
default='',
help=_('List of security groups to attach to the Amphora')),
cfg.StrOpt('amphora_driver',
default='amphora_noop_driver',
help=_('Name of the amphora driver to use')),
cfg.StrOpt('compute_driver',
default='compute_noop_driver',
help=_('Name of the compute driver to use')),
cfg.StrOpt('network_driver',
default='network_noop_driver',
help=_('Name of the network driver to use'))
]
task_flow_opts = [
cfg.StrOpt('engine',
default='serial',
help=_('TaskFlow engine to use')),
cfg.IntOpt('max_workers',
default=5,
help=_('The maximum number of workers'))
]
core_cli_opts = []
# Register the configuration options
@ -111,6 +153,8 @@ cfg.CONF.register_opts(core_opts)
cfg.CONF.register_opts(networking_opts, group='networking')
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
cfg.CONF.register_opts(haproxy_amphora_opts, group='haproxy_amphora')
cfg.CONF.register_opts(controller_worker_opts, group='controller_worker')
cfg.CONF.register_opts(task_flow_opts, group='task_flow')
cfg.CONF.register_cli_opts(core_cli_opts)
cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
cfg.CONF.register_opts(keystone_authtoken_v3_opts,

@ -37,16 +37,37 @@ PROTOCOL_HTTPS = 'HTTPS'
PROTOCOL_TERMINATED_HTTPS = 'TERMINATED_HTTPS'
SUPPORTED_PROTOCOLS = (PROTOCOL_TCP, PROTOCOL_HTTPS, PROTOCOL_HTTP)
# Note: The database Amphora table has a foreign key constraint against
# the provisioning_status table
# Amphora has been allocated to a load balancer
AMPHORA_ALLOCATED = 'ALLOCATED'
# Amphora healthy with listener(s) deployed
# TODO(johnsom) This doesn't exist
AMPHORA_UP = 'UP'
# Amphora unhealthy with listener(s) deployed
# TODO(johnsom) This doesn't exist
AMPHORA_DOWN = 'DOWN'
# Amphora is being built
AMPHORA_BOOTING = 'BOOTING'
# Amphora is ready to be allocated to a load balancer
AMPHORA_READY = 'READY'
ACTIVE = 'ACTIVE'
PENDING_DELETE = 'PENDING_DELETE'
PENDING_UPDATE = 'PENDING_UPDATE'
PENDING_CREATE = 'PENDING_CREATE'
DELETED = 'DELETED'
ERROR = 'ERROR'
SUPPORTED_PROVISIONING_STATUSES = (ACTIVE, PENDING_DELETE, PENDING_CREATE,
SUPPORTED_PROVISIONING_STATUSES = (ACTIVE, AMPHORA_ALLOCATED,
AMPHORA_BOOTING, AMPHORA_READY,
PENDING_DELETE, PENDING_CREATE,
PENDING_UPDATE, DELETED, ERROR)
MUTABLE_STATUSES = (ACTIVE,)
SUPPORTED_AMPHORA_STATUSES = (AMPHORA_ALLOCATED, AMPHORA_UP, AMPHORA_DOWN,
AMPHORA_BOOTING, AMPHORA_READY, DELETED,
PENDING_DELETE)
ONLINE = 'ONLINE'
OFFLINE = 'OFFLINE'
DEGRADED = 'DEGRADED'
@ -56,9 +77,33 @@ SUPPORTED_OPERATING_STATUSES = (ONLINE, OFFLINE, DEGRADED, ERROR)
AMPHORA_VM = 'VM'
SUPPORTED_AMPHORA_TYPES = (AMPHORA_VM,)
AMPHORA_UP = 'UP'
AMPHORA_DOWN = 'DOWN'
SUPPORTED_AMPHORA_STATUSES = (AMPHORA_UP, AMPHORA_DOWN)
# Task/Flow constants
AMPHORA = 'amphora'
DELTA = 'delta'
LISTENER = 'listener'
LOADBALANCER = 'loadbalancer'
NICS = 'nics'
VIP = 'vip'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow'
CREATE_HEALTH_MONITOR_FLOW = 'octavia-create-health-monitor-flow'
CREATE_LISTENER_FLOW = 'octavia-create-listener_flow'
CREATE_LOADBALANCER_FLOW = 'octavia-create-loadbalancer-flow'
CREATE_MEMBER_FLOW = 'octavia-create-member-flow'
CREATE_POOL_FLOW = 'octavia-create-pool-flow'
DELETE_AMPHORA_FLOW = 'octavia-delete-amphora-flow'
DELETE_HEALTH_MONITOR_FLOW = 'octavia-delete-health-monitor-flow'
DELETE_LISTENER_FLOW = 'octavia-delete-listener_flow'
DELETE_LOADBALANCER_FLOW = 'octavia-delete-loadbalancer-flow'
DELETE_MEMBER_FLOW = 'octavia-delete-member-flow'
DELETE_POOL_FLOW = 'octavia-delete-pool-flow'
LOADBALANCER_NETWORKING_SUBFLOW = 'octavia-new-loadbalancer-net-subflow'
UPDATE_HEALTH_MONITOR_FLOW = 'octavia-update-health-monitor-flow'
UPDATE_LISTENER_FLOW = 'octavia-update-listener-flow'
UPDATE_LOADBALANCER_FLOW = 'octavia-update-loadbalancer-flow'
UPDATE_MEMBER_FLOW = 'octavia-update-member-flow'
UPDATE_POOL_FLOW = 'octavia-update-pool-flow'
NOVA_1 = '1.1'
NOVA_2 = '2'

@ -20,7 +20,7 @@ Octavia base exception handling.
from oslo_utils import excutils
from webob import exc
from octavia.i18n import _LE
from octavia.i18n import _LE, _LI
class OctaviaException(Exception):
@ -151,3 +151,15 @@ class ComputeStatusException(OctaviaException):
class IDAlreadyExists(OctaviaException):
message = _LE('Already an entity with that specified id.')
code = 409
class NoSuitableAmphoraException(OctaviaException):
message = _LE('Unable to allocate an amphora due to: %(msg)s')
# This is an internal use exception for the taskflow work flow
# and will not be exposed to the customer. This means it is a
# normal part of operation while waiting for nova to go active
# on the instance
class ComputeWaitTimeoutException(OctaviaException):
message = _LI('Waiting for compute to go active timeout.')

@ -0,0 +1,469 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import logging
from octavia.common import base_taskflow
from octavia.common import exceptions
from octavia.controller.worker.flows import amphora_flows
from octavia.controller.worker.flows import health_monitor_flows
from octavia.controller.worker.flows import listener_flows
from octavia.controller.worker.flows import load_balancer_flows
from octavia.controller.worker.flows import member_flows
from octavia.controller.worker.flows import pool_flows
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from taskflow.listeners import logging as tf_logging
LOG = logging.getLogger(__name__)
class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
def __init__(self):
self._amphora_flows = amphora_flows.AmphoraFlows()
self._health_monitor_flows = health_monitor_flows.HealthMonitorFlows()
self._lb_flows = load_balancer_flows.LoadBalancerFlows()
self._listener_flows = listener_flows.ListenerFlows()
self._member_flows = member_flows.MemberFlows()
self._pool_flows = pool_flows.PoolFlows()
self._amphora_repo = repo.AmphoraRepository()
self._health_mon_repo = repo.HealthMonitorRepository()
self._lb_repo = repo.LoadBalancerRepository()
self._listener_repo = repo.ListenerRepository()
self._member_repo = repo.MemberRepository()
self._pool_repo = repo.PoolRepository()
super(ControllerWorker, self).__init__()
def create_amphora(self):
"""Creates an Amphora.
:returns: amphora_id
"""
create_amp_tf = self._taskflow_load(self._amphora_flows.
get_create_amphora_flow())
with tf_logging.DynamicLoggingListener(create_amp_tf,
log=LOG):
create_amp_tf.run()
return create_amp_tf.storage.fetch('amphora')
def delete_amphora(self, amphora_id):
"""Deletes an existing Amphora.
:param amphora_id: ID of the amphora to delete
:returns: None
:raises AmphoraNotFound: The referenced Amphora was not found
"""
amphora = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
delete_amp_tf = self._taskflow_load(self._amphora_flows.
get_delete_amphora_flow(),
store={'amphora': amphora})
with tf_logging.DynamicLoggingListener(delete_amp_tf,
log=LOG):
delete_amp_tf.run()
def create_health_monitor(self, health_monitor_id):
"""Creates a health monitor.
:param health_monitor_id: ID of the health monitor to create
:returns: None
:raises NoSuitablePool: Unable to find the node pool
"""
health_mon = self._health_mon_repo.get(db_apis.get_session(),
pool_id=health_monitor_id)
listener = health_mon.pool.listener
health_mon.pool.health_monitor = health_mon
listener.default_pool = health_mon.pool
vip = health_mon.pool.listener.load_balancer.vip
load_balancer = health_mon.pool.listener.load_balancer
create_hm_tf = self._taskflow_load(self._health_monitor_flows.
get_create_health_monitor_flow(),
store={'health_mon': health_mon,
'listener': listener,
'loadbalancer':
load_balancer,
'vip': vip})
with tf_logging.DynamicLoggingListener(create_hm_tf,
log=LOG):
create_hm_tf.run()
def delete_health_monitor(self, health_monitor_id):
"""Deletes a health monitor.
:param health_monitor_id: ID of the health monitor to delete
:returns: None
:raises HMNotFound: The referenced health monitor was not found
"""
health_mon = self._health_mon_repo.get(db_apis.get_session(),
pool_id=health_monitor_id)
listener = health_mon.pool.listener
health_mon.pool.health_monitor = health_mon
listener.default_pool = health_mon.pool
vip = health_mon.pool.listener.load_balancer.vip
delete_hm_tf = self._taskflow_load(self._health_monitor_flows.
get_delete_health_monitor_flow(),
store={'health_mon': health_mon,
'health_mon_id':
health_monitor_id,
'listener': listener,
'vip': vip})
with tf_logging.DynamicLoggingListener(delete_hm_tf,
log=LOG):
delete_hm_tf.run()
def update_health_monitor(self, health_monitor_id, health_monitor_updates):
"""Updates a health monitor.
:param health_monitor_id: ID of the health monitor to update
:param health_monitor_updates: Dict containing updated health monitor
:returns: None
:raises HMNotFound: The referenced health monitor was not found
"""
health_mon = self._health_mon_repo.get(db_apis.get_session(),
pool_id=health_monitor_id)
listener = health_mon.pool.listener
health_mon.pool.health_monitor = health_mon
listener.default_pool = health_mon.pool
vip = health_mon.pool.listener.load_balancer.vip
load_balancer = health_mon.pool.listener.load_balancer
update_hm_tf = self._taskflow_load(self._health_monitor_flows.
get_update_health_monitor_flow(),
store={'health_mon': health_mon,
'listener': listener,
'loadbalancer':
load_balancer,
'vip': vip,
'update_dict':
health_monitor_updates})
with tf_logging.DynamicLoggingListener(update_hm_tf,
log=LOG):
update_hm_tf.run()
def create_listener(self, listener_id):
"""Creates a listener.
:param listener_id: ID of the listener to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
"""
listener = self._listener_repo.get(db_apis.get_session(),
id=listener_id)
load_balancer = listener.load_balancer
vip = listener.load_balancer.vip
create_listener_tf = self._taskflow_load(self._listener_flows.
get_create_listener_flow(),
store={'listener': listener,
'loadbalancer':
load_balancer,
'vip': vip})
with tf_logging.DynamicLoggingListener(create_listener_tf,
log=LOG):
create_listener_tf.run()
def delete_listener(self, listener_id):
"""Deletes a listener.
:param listener_id: ID of the listener to delete
:returns: None
:raises ListenerNotFound: The referenced listener was not found
"""
listener = self._listener_repo.get(db_apis.get_session(),
id=listener_id)
vip = listener.load_balancer.vip
delete_listener_tf = self._taskflow_load(self._listener_flows.
get_delete_listener_flow(),
store={'listener': listener,
'vip': vip})
with tf_logging.DynamicLoggingListener(delete_listener_tf,
log=LOG):
delete_listener_tf.run()
def update_listener(self, listener_id, listener_updates):
"""Updates a listener.
:param listener_id: ID of the listener to update
:param listener_updates: Dict containing updated listener attributes
:returns: None
:raises ListenerNotFound: The referenced listener was not found
"""
listener = self._listener_repo.get(db_apis.get_session(),
id=listener_id)
load_balancer = listener.load_balancer
vip = listener.load_balancer.vip
update_listener_tf = self._taskflow_load(self._listener_flows.
get_update_listener_flow(),
store={'listener': listener,
'vip': vip,
'loadbalancer':
load_balancer,
'update_dict':
listener_updates})
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
update_listener_tf.run()
def create_load_balancer(self, load_balancer_id):
"""Creates a load balancer by allocating Amphorae.
First tries to allocate an existing Amphora in READY state.
If none are available it will attempt to build one specificly
for this load balancer.
:param load_balancer_id: ID of the load balancer to create
:returns: None
:raises NoSuitableAmphoraException: Unable to allocate an Amphora.
"""
# Note this is a bit strange in how it handles building
# Amphora if there are no spares. TaskFlow has a spec for
# a conditional flow that would make this cleaner once implemented.
# https://review.openstack.org/#/c/98946/
lb = self._lb_repo.get(db_apis.get_session(),
id=load_balancer_id)
create_lb_tf = self._taskflow_load(self._lb_flows.
get_create_load_balancer_flow(),
store={'loadbalancer': lb})
with tf_logging.DynamicLoggingListener(create_lb_tf,
log=LOG):
amp = None
try:
create_lb_tf.run()
amp = create_lb_tf.storage.fetch('amphora')
except Exception:
pass
if amp is None:
create_amp_lb_tf = self._taskflow_load(
self._amphora_flows.get_create_amphora_for_lb_flow(),
store={'loadbalancer': lb})
with tf_logging.DynamicLoggingListener(create_amp_lb_tf,
log=LOG):
try:
create_amp_lb_tf.run()
except exceptions.ComputeBuildException as e:
raise exceptions.NoSuitableAmphoraException(msg=e.msg)
def delete_load_balancer(self, load_balancer_id):
"""Deletes a load balancer by de-allocating Amphorae.
:param load_balancer_id: ID of the load balancer to delete
:returns: None
:raises LBNotFound: The referenced load balancer was not found
"""
lb = self._lb_repo.get(db_apis.get_session(),
id=load_balancer_id)
delete_lb_tf = self._taskflow_load(self._lb_flows.
get_delete_load_balancer_flow(),
store={'loadbalancer': lb})
with tf_logging.DynamicLoggingListener(delete_lb_tf,
log=LOG):
delete_lb_tf.run()
def update_load_balancer(self, load_balancer_id, load_balancer_updates):
"""Updates a load balancer.
:param load_balancer_id: ID of the load balancer to update
:param load_balancer_updates: Dict containing updated load balancer
:returns: None
:raises LBNotFound: The referenced load balancer was not found
"""
lb = self._lb_repo.get(db_apis.get_session(),
id=load_balancer_id)
update_lb_tf = self._taskflow_load(self._lb_flows.
get_update_load_balancer_flow(),
store={'loadbalancer': lb})
with tf_logging.DynamicLoggingListener(update_lb_tf,
log=LOG):
update_lb_tf.run()
def create_member(self, member_id):
"""Creates a pool member.
:param member_id: ID of the member to create
:returns: None
:raises NoSuitablePool: Unable to find the node pool
"""
member = self._member_repo.get(db_apis.get_session(),
id=member_id)
listener = member.pool.listener
listener.default_pool = member.pool
load_balancer = listener.load_balancer
vip = listener.load_balancer.vip
create_member_tf = self._taskflow_load(self._member_flows.
get_create_member_flow(),
store={'member': member,
'listener': listener,
'loadbalancer':
load_balancer,
'vip': vip})
with tf_logging.DynamicLoggingListener(create_member_tf,
log=LOG):
create_member_tf.run()
def delete_member(self, member_id):
"""Deletes a pool member.
:param memberr_id: ID of the member to delete
:returns: None
:raises MemberNotFound: The referenced member was not found
"""
member = self._member_repo.get(db_apis.get_session(),
id=member_id)
listener = member.pool.listener
listener.default_pool = member.pool
vip = listener.load_balancer.vip
delete_member_tf = self._taskflow_load(self._member_flows.
get_delete_member_flow(),
store={'member': member,
'member_id': member_id,
'listener': listener,
'vip': vip})
with tf_logging.DynamicLoggingListener(delete_member_tf,
log=LOG):
delete_member_tf.run()
def update_member(self, member_id, member_updates):
"""Updates a pool member.
:param member_id: ID of the member to update
:param member_updates: Dict containing updated member attributes
:returns: None
:raises MemberNotFound: The referenced member was not found
"""
member = self._member_repo.get(db_apis.get_session(),
id=member_id)
listener = member.pool.listener
listener.default_pool = member.pool
load_balancer = listener.load_balancer
vip = listener.load_balancer.vip
update_member_tf = self._taskflow_load(self._member_flows.
get_update_member_flow(),
store={'member': member,
'listener': listener,
'loadbalancer':
load_balancer,
'vip': vip,
'update_dict':
member_updates})
with tf_logging.DynamicLoggingListener(update_member_tf,
log=LOG):
update_member_tf.run()
def create_pool(self, pool_id):
"""Creates a node pool.
:param pool_id: ID of the pool to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
"""
pool = self._pool_repo.get(db_apis.get_session(),
id=pool_id)
listener = pool.listener
listener.default_pool = pool
load_balancer = listener.load_balancer
vip = listener.load_balancer.vip
create_pool_tf = self._taskflow_load(self._pool_flows.
get_create_pool_flow(),
store={'pool': pool,
'listener': listener,
'loadbalancer':
load_balancer,
'vip': vip})
with tf_logging.DynamicLoggingListener(create_pool_tf,
log=LOG):
create_pool_tf.run()
def delete_pool(self, pool_id):
"""Deletes a node pool.
:param pool_id: ID of the pool to delete
:returns: None
:raises PoolNotFound: The referenced pool was not found
"""
pool = self._pool_repo.get(db_apis.get_session(),
id=pool_id)
listener = pool.listener
listener.default_pool = pool
vip = listener.load_balancer.vip
delete_pool_tf = self._taskflow_load(self._pool_flows.
get_delete_pool_flow(),
store={'pool': pool,
'pool_id': pool_id,
'listener': listener,
'vip': vip})
with tf_logging.DynamicLoggingListener(delete_pool_tf,
log=LOG):
delete_pool_tf.run()
def update_pool(self, pool_id, pool_updates):
"""Updates a node pool.
:param pool_id: ID of the pool to update
:param pool_updates: Dict containing updated pool attributes
:returns: None
:raises PoolNotFound: The referenced pool was not found
"""
pool = self._pool_repo.get(db_apis.get_session(),
id=pool_id)
listener = pool.listener
listener.default_pool = pool
load_balancer = listener.load_balancer
vip = listener.load_balancer.vip
update_pool_tf = self._taskflow_load(self._pool_flows.
get_update_pool_flow(),
store={'pool': pool,
'listener': listener,
'loadbalancer':
load_balancer,
'vip': vip,
'update_dict':
pool_updates})
with tf_logging.DynamicLoggingListener(update_pool_tf,
log=LOG):
update_pool_tf.run()

@ -0,0 +1,115 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo.config import cfg
from taskflow.patterns import linear_flow
from taskflow import retry
from octavia.common import constants
from octavia.controller.worker.flows import load_balancer_flows
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import compute_tasks
from octavia.controller.worker.tasks import controller_tasks
from octavia.controller.worker.tasks import database_tasks
CONF = cfg.CONF
CONF.import_group('controller_worker', 'octavia.common.config')
class AmphoraFlows(object):
def __init__(self):
self._lb_flows = load_balancer_flows.LoadBalancerFlows()
def get_create_amphora_flow(self):
"""Creates a flow to create an amphora.
Ideally that should be configurable in the
config file - a db session needs to be placed
into the flow
:returns: The flow for creating the amphora
"""
create_amphora_flow = linear_flow.Flow(constants.CREATE_AMPHORA_FLOW)
create_amphora_flow.add(database_tasks.CreateAmphoraInDB(
provides='amphora'))
create_amphora_flow.add(compute_tasks.ComputeCreate())
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB())
wait_flow = linear_flow.Flow('wait_for_amphora',
retry=retry.Times(CONF.
controller_worker.
amp_active_retries))
wait_flow.add(compute_tasks.ComputeWait())
create_amphora_flow.add(wait_flow)
create_amphora_flow.add(amphora_driver_tasks.AmphoraFinalize())
create_amphora_flow.add(database_tasks.MarkAmphoraReadyInDB())
return create_amphora_flow
def get_create_amphora_for_lb_flow(self):
"""Creates a flow to create an amphora for a load balancer.
This flow is used when there are no spare amphora available
for a new load balancer. It builds an amphora and allocates
for the specific load balancer.
:returns: The The flow for creating the amphora
"""
create_amp_for_lb_flow = linear_flow.Flow(constants.
CREATE_AMPHORA_FOR_LB_FLOW)
create_amp_for_lb_flow.add(database_tasks.CreateAmphoraInDB())
create_amp_for_lb_flow.add(compute_tasks.ComputeCreate())
create_amp_for_lb_flow.add(database_tasks.MarkAmphoraBootingInDB())
wait_flow = linear_flow.Flow('wait_for_amphora',
retry=retry.Times(CONF.
controller_worker.
amp_active_retries))
wait_flow.add(compute_tasks.ComputeWait())
create_amp_for_lb_flow.add(wait_flow)
create_amp_for_lb_flow.add(amphora_driver_tasks.
AmphoraFinalize())
create_amp_for_lb_flow.add(database_tasks.
MarkAmphoraAllocatedInDB(
requires='loadbalancer'))
create_amp_for_lb_flow.add(database_tasks.GetAmphoraByID(
requires='amphora_id',
provides='amphora'))
create_amp_for_lb_flow.add(database_tasks.GetLoadbalancerByID(
requires='loadbalancer_id',
provides='loadbalancer'))
new_LB_net_subflow = self._lb_flows.get_new_LB_networking_subflow()
create_amp_for_lb_flow.add(new_LB_net_subflow)
create_amp_for_lb_flow.add(database_tasks.MarkLBActiveInDB(
requires='loadbalancer'))
return create_amp_for_lb_flow
def get_delete_amphora_flow(self):
"""Creates a flow to delete an amphora.
This should be configurable in the config file
:returns: The flow for deleting the amphora
:raises AmphoraNotFound: The referenced Amphora was not found
"""
delete_amphora_flow = linear_flow.Flow(constants.DELETE_AMPHORA_FLOW)
delete_amphora_flow.add(controller_tasks.DeleteLoadBalancersOnAmp(
requires='amphora'))
# TODO(johnsom) make this just delete it
delete_amphora_flow.add(database_tasks.
MarkAmphoraPendingDeleteInDB(
requires='amphora'))
return delete_amphora_flow

@ -0,0 +1,73 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import model_tasks
class HealthMonitorFlows(object):
def get_create_health_monitor_flow(self):
"""Create a flow to create a health monitor
:returns: The flow for creating a health monitor
"""
create_hm_flow = linear_flow.Flow(constants.CREATE_HEALTH_MONITOR_FLOW)
create_hm_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
create_hm_flow.add(database_tasks.MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return create_hm_flow
def get_delete_health_monitor_flow(self):
"""Create a flow to delete a health monitor
:returns: The flow for deleting a health monitor
"""
delete_hm_flow = linear_flow.Flow(constants.DELETE_HEALTH_MONITOR_FLOW)
delete_hm_flow.add(model_tasks.
DeleteModelObject(rebind={'object': 'health_mon'}))
delete_hm_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
delete_hm_flow.add(database_tasks.DeleteHealthMonitorInDB(
requires='health_mon_id'))
delete_hm_flow.add(database_tasks.MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return delete_hm_flow
def get_update_health_monitor_flow(self):
"""Create a flow to update a health monitor
:returns: The flow for updating a health monitor
"""
update_hm_flow = linear_flow.Flow(constants.UPDATE_HEALTH_MONITOR_FLOW)
update_hm_flow.add(model_tasks.
UpdateAttributes(
rebind={'object': 'health_mon'},
requires=['update_dict']))
update_hm_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
update_hm_flow.add(database_tasks.UpdateHealthMonInDB(
requires=['health_mon', 'update_dict']))
update_hm_flow.add(database_tasks.MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return update_hm_flow

@ -0,0 +1,73 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import model_tasks
class ListenerFlows(object):
def get_create_listener_flow(self):
"""Create a flow to create a listener
:returns: The flow for creating a listener
"""
create_listener_flow = linear_flow.Flow(constants.CREATE_LISTENER_FLOW)
create_listener_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
create_listener_flow.add(database_tasks.
MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return create_listener_flow
def get_delete_listener_flow(self):
"""Create a flow to delete a listener
:returns: The flow for deleting a listener
"""
delete_listener_flow = linear_flow.Flow(constants.DELETE_LISTENER_FLOW)
delete_listener_flow.add(amphora_driver_tasks.ListenerDelete(
requires=['listener', 'vip']))
delete_listener_flow.add(database_tasks.MarkListenerDeletedInDB(
requires='listener'))
delete_listener_flow.add(database_tasks.
MarkLBActiveInDB(requires='loadbalancer'))
return delete_listener_flow
def get_update_listener_flow(self):
"""Create a flow to update a listener
:returns: The flow for updating a listener
"""
update_listener_flow = linear_flow.Flow(constants.UPDATE_LISTENER_FLOW)
update_listener_flow.add(model_tasks.
UpdateAttributes(
rebind={'object': 'listener'},
requires=['update_dict']))
update_listener_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
update_listener_flow.add(database_tasks.UpdateListenerInDB(
requires=['listener', 'update_dict']))
update_listener_flow.add(database_tasks.
MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return update_listener_flow

@ -0,0 +1,108 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo.config import cfg
from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import controller_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import network_tasks
CONF = cfg.CONF
CONF.import_group('controller_worker', 'octavia.common.config')
class LoadBalancerFlows(object):
def get_create_load_balancer_flow(self):
"""Creates a flow to create a load balancer.
:returns: The flow for creating a load balancer
"""
# Note this flow is a bit strange in how it handles building
# Amphora if there are no spares. TaskFlow has a spec for
# a conditional flow that would make this cleaner once implemented.
# https://review.openstack.org/#/c/98946/
create_LB_flow = linear_flow.Flow(constants.CREATE_LOADBALANCER_FLOW)
create_LB_flow.add(database_tasks.MapLoadbalancerToAmphora(
requires='loadbalancer',
provides='amphora'))
create_LB_flow.add(database_tasks.GetAmphoraByID(
requires='amphora_id',
provides='amphora'))
create_LB_flow.add(database_tasks.GetLoadbalancerByID(
requires='loadbalancer_id',
provides='loadbalancer'))
new_LB_net_subflow = self.get_new_LB_networking_subflow()
create_LB_flow.add(new_LB_net_subflow)
create_LB_flow.add(database_tasks.MarkLBActiveInDB(
requires='loadbalancer'))
return create_LB_flow
def get_delete_load_balancer_flow(self):
"""Creates a flow to delete a load balancer.
:returns: The flow for deleting a load balancer
"""
delete_LB_flow = linear_flow.Flow(constants.DELETE_LOADBALANCER_FLOW)
delete_LB_flow.add(controller_tasks.DeleteListenersOnLB(
requires='loadbalancer'))
# TODO(johnsom) tear down the unplug vips? and networks
delete_LB_flow.add(database_tasks.MarkLBDeletedInDB(
requires='loadbalancer'))
return delete_LB_flow
def get_new_LB_networking_subflow(self):
"""Create a sub-flow to setup networking.
:returns: The flow to setup networking for a new amphora
"""
new_LB_net_subflow = linear_flow.Flow(constants.
LOADBALANCER_NETWORKING_SUBFLOW)
new_LB_net_subflow.add(network_tasks.GetPlumbedNetworks(
requires='amphora',
provides='nics'))
new_LB_net_subflow.add(network_tasks.CalculateDelta(
requires=['amphora', 'nics'],
provides='delta'))
new_LB_net_subflow.add(network_tasks.PlugNetworks(
requires=['amphora', 'delta']))
new_LB_net_subflow.add(amphora_driver_tasks.AmphoraPostNetworkPlug(
requires='amphora'))
new_LB_net_subflow.add(network_tasks.PlugVIP(requires='amphora'))
new_LB_net_subflow.add(amphora_driver_tasks.AmphoraPostVIPPlug(
requires='loadbalancer'))
return new_LB_net_subflow
def get_update_load_balancer_flow(self):
"""Creates a flow to update a load balancer.
:returns: The flow for update a load balancer
"""
update_LB_flow = linear_flow.Flow(constants.UPDATE_LOADBALANCER_FLOW)
update_LB_flow.add(controller_tasks.DisableEnableLB(
requires='loadbalancer'))
update_LB_flow.add(database_tasks.MarkLBActiveInDB(
requires='loadbalancer'))
return update_LB_flow

@ -0,0 +1,76 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import model_tasks
class MemberFlows(object):
def get_create_member_flow(self):
"""Create a flow to create a member
:returns: The flow for creating a member
"""
create_member_flow = linear_flow.Flow(constants.CREATE_MEMBER_FLOW)
create_member_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
create_member_flow.add(database_tasks.
MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return create_member_flow
def get_delete_member_flow(self):
"""Create a flow to delete a member
:returns: The flow for deleting a member
"""
delete_member_flow = linear_flow.Flow(constants.DELETE_MEMBER_FLOW)
delete_member_flow.add(model_tasks.
DeleteModelObject(rebind={'object': 'member'}))
delete_member_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
delete_member_flow.add(database_tasks.DeleteMemberInDB(
requires='member_id'))
delete_member_flow.add(database_tasks.
MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return delete_member_flow
def get_update_member_flow(self):
"""Create a flow to update a member
:returns: The flow for updating a member
"""
update_member_flow = linear_flow.Flow(constants.UPDATE_MEMBER_FLOW)
update_member_flow.add(model_tasks.
UpdateAttributes(
rebind={'object': 'member'},
requires=['update_dict']))
update_member_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
update_member_flow.add(database_tasks.UpdateMemberInDB(
requires=['member', 'update_dict']))
update_member_flow.add(database_tasks.
MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return update_member_flow

@ -0,0 +1,72 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import model_tasks
class PoolFlows(object):
def get_create_pool_flow(self):
"""Create a flow to create a pool
:returns: The flow for creating a pool
"""
create_pool_flow = linear_flow.Flow(constants.CREATE_POOL_FLOW)
create_pool_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
create_pool_flow.add(database_tasks.MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return create_pool_flow
def get_delete_pool_flow(self):
"""Create a flow to delete a pool
:returns: The flow for deleting a pool
"""
delete_pool_flow = linear_flow.Flow(constants.DELETE_POOL_FLOW)
delete_pool_flow.add(model_tasks.
DeleteModelObject(rebind={'object': 'pool'}))
delete_pool_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
delete_pool_flow.add(database_tasks.DeletePoolInDB(requires='pool_id'))
delete_pool_flow.add(database_tasks.MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return delete_pool_flow
def get_update_pool_flow(self):
"""Create a flow to update a pool
:returns: The flow for updating a pool
"""
update_pool_flow = linear_flow.Flow(constants.UPDATE_POOL_FLOW)
update_pool_flow.add(model_tasks.
UpdateAttributes(
rebind={'object': 'pool'},
requires=['update_dict']))
update_pool_flow.add(amphora_driver_tasks.ListenerUpdate(
requires=['listener', 'vip']))
update_pool_flow.add(database_tasks.UpdatePoolInDB(
requires=['pool', 'update_dict']))
update_pool_flow.add(database_tasks.MarkLBAndListenerActiveInDB(
requires=['loadbalancer', 'listener']))
return update_pool_flow

@ -0,0 +1,179 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import logging
from oslo.config import cfg
from stevedore import driver as stevedore_driver
from taskflow import task
from octavia.common import constants
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.i18n import _LW
CONF = cfg.CONF
CONF.import_group('controller_worker', 'octavia.common.config')
LOG = logging.getLogger(__name__)
class BaseAmphoraTask(task.Task):
"""Base task to load drivers common to the tasks."""
def __init__(self, **kwargs):
super(BaseAmphoraTask, self).__init__(**kwargs)
self.amphora_driver = stevedore_driver.DriverManager(
namespace='octavia.amphora.drivers',
name=CONF.controller_worker.amphora_driver,
invoke_on_load=True
).driver
self.amphora_repo = repo.AmphoraRepository()
self.listener_repo = repo.ListenerRepository()
self.loadbalancer_repo = repo.LoadBalancerRepository()
class ListenerUpdate(BaseAmphoraTask):
"""Task to update an amphora with new configuration for the listener."""
def execute(self, listener, vip):
"""Execute listener update routines for an amphora."""
self.amphora_driver.update(listener, vip)
LOG.debug("Updated amphora with new configuration for listener")
def revert(self, listener, *args, **kwargs):
"""Handle a failed listener update."""
LOG.warn(_LW("Reverting listener update."))
self.listener_repo.update(db_apis.get_session(), id=listener.id,
provisioning_status=constants.ERROR)
return None
class ListenerStop(BaseAmphoraTask):
"""Task to stop the listener on the vip."""
def execute(self, listener, vip):
"""Execute listener stop routines for an amphora."""
self.amphora_driver.stop(listener, vip)
LOG.debug("Stopped the listener on the vip")
def revert(self, listener, *args, **kwargs):
"""Handle a failed listener stop."""
LOG.warn(_LW("Reverting listener stop."))
self.listener_repo.update(db_apis.get_session(), id=listener.id,
provisioning_status=constants.ERROR)
return None
class ListenerStart(BaseAmphoraTask):
"""Task to start the listener on the vip."""
def execute(self, listener, vip):
"""Execute listener start routines for an amphora."""
self.amphora_driver.start(listener, vip)
LOG.debug("Started the listener on the vip")
def revert(self, listener, *args, **kwargs):
"""Handle a failed listener start."""
LOG.warn(_LW("Reverting listener start."))
self.listener_repo.update(db_apis.get_session(), id=listener.id,
provisioning_status=constants.ERROR)
return None
class ListenerDelete(BaseAmphoraTask):
"""Task to delete the listener on the vip."""
def execute(self, listener, vip):
"""Execute listener delete routines for an amphora."""
self.amphora_driver.delete(listener, vip)
LOG.debug("Deleted the listener on the vip")
def revert(self, listener, *args, **kwargs):
"""Handle a failed listener delete."""
LOG.warn(_LW("Reverting listener delete."))
self.listener_repo.update(db_apis.get_session(), id=listener.id,
provisioning_status=constants.ERROR)
return None
class AmphoraGetInfo(BaseAmphoraTask):
"""Task to get information on an amphora."""
def execute(self, amphora):
"""Execute get_info routine for an amphora."""
self.amphora_driver.get_info(amphora)
class AmphoraGetDiagnostics(BaseAmphoraTask):
"""Task to get diagnostics on the amphora and the loadbalancers."""
def execute(self, amphora):
"""Execute get_diagnostic routine for an amphora."""
self.amphora_driver.get_diagnostics(amphora)
class AmphoraFinalize(BaseAmphoraTask):
"""Task to finalize the amphora before any listeners are configured."""
def execute(self, amphora):
"""Execute finalize_amphora routine."""
self.amphora_driver.finalize_amphora(amphora)
LOG.debug("Finalized the amphora.")
def revert(self, amphora, *args, **kwargs):
"""Handle a failed amphora finalize."""
LOG.warn(_LW("Reverting amphora finalize."))
self.amphora_repo.update(db_apis.get_session(), id=amphora.id,
provisioning_status=constants.ERROR)
return None
class AmphoraPostNetworkPlug(BaseAmphoraTask):
"""Task to notify the amphora post network plug."""
def execute(self, amphora):
"""Execute post_network_plug routine."""
self.amphora_driver.post_network_plug(amphora)
LOG.debug("Posted network plug for the compute instance")
def revert(self, amphora, *args, **kwargs):
"""Handle a failed post network plug."""
LOG.warn(_LW("Reverting post network plug."))
self.amphora_repo.update(db_apis.get_session(), id=amphora.id,
provisioning_status=constants.ERROR)
return None
class AmphoraPostVIPPlug(BaseAmphoraTask):
"""Task to notify the amphora post VIP plug."""
def execute(self, loadbalancer):
"""Execute post_vip_routine."""
self.amphora_driver.post_vip_plug(loadbalancer)
LOG.debug("Notfied amphora of vip plug")
def revert(self, loadbalancer, *args, **kwargs):
"""Handle a failed amphora vip plug notification."""
LOG.warn(_LW("Reverting post vip plug."))
self.loadbalancer_repo.update(db_apis.get_session(),
id=loadbalancer.id,
provisioning_status=constants.ERROR)
return None

@ -0,0 +1,110 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import logging
import time