Tweaking LB policy implementation

This patch is an attempt to refactor the current LB policy
implementation.

Change-Id: Ia03feaa292d372c373ca9f7907171c4a8fc204a8
This commit is contained in:
tengqm 2015-07-03 02:20:24 -04:00
parent b8dda6f907
commit ad349892e0
6 changed files with 285 additions and 286 deletions

View File

@ -1,14 +1,7 @@
# load-balancing policy spec using Neutron LBaaS service
# NOTE: properties are combined from LB and Pool
# Each Pool member has its own 'address', 'protocol_port, 'weight',
# and 'admin_state_up' property
#### Pool properties
pool:
# Pool ID/name, if given can use an existing pool
# pool: <ID>
# Protocol used for load balancing
protocol: HTTP

View File

@ -145,8 +145,8 @@ class KeystoneClient(base.DriverBase):
def get_service_credentials(**kwargs):
'''Senlin service credential to use with Keystone.
:param args: An additional keyword argument list that can be used
for customizing the default settings.
:param kwargs: An additional keyword argument list that can be used
for customizing the default settings.
'''
creds = {

View File

@ -15,8 +15,9 @@ import six
from oslo_log import log as logging
from senlin.common import context
from senlin.common.i18n import _
from senlin.common import trust
from senlin.common.i18n import _LE
from senlin.drivers import base
from senlin.drivers.openstack import neutron_v2 as neutronclient
@ -24,100 +25,99 @@ LOG = logging.getLogger(__name__)
class LoadBalancerDriver(base.DriverBase):
"""Common driver for LoadBalancer management"""
"""Load-balancing driver based on Neutron LBaaS service."""
def __init__(self, context):
super(LoadBalancerDriver, self).__init__(context)
self.context = context
params = trust.get_connection_params(context)
self.nc = neutronclient.NeutronClient(params)
def __init__(self, ctx):
super(LoadBalancerDriver, self).__init__(ctx)
self.ctx = ctx
self._nc = None
def _wait_for_lb_ready(self, lb_data, ignore_not_found=False):
"""Keep waiting until loadbalancer is in ready status
def nc(self):
if self._nc:
return self._nc
This method will keep waiting until loadbalancer resource listed in
lb_data becoming ready which means its provisioning_status is ACTIVE
its operating_status is ONLINE. If ignore_not_found is set to True,
unexisting of loadbalancer resource is also an acceptable result.
self._nc = neutronclient.NeutronClient(self.ctx)
return self._nc
def _wait_for_lb_ready(self, lb_id, timeout=60, ignore_not_found=False):
"""Keep waiting until loadbalancer is ready
This method will keep waiting until loadbalancer resource specified
by lb_id becomes ready, i.e. its provisioning_status is ACTIVE and
its operating_status is ONLINE.
:param lb_id: ID of the load-balancer to check.
:param timeout: timeout in seconds.
:param ignore_not_found: if set to True, nonexistent loadbalancer
resource is also an acceptable result.
"""
loadbalancer_id = lb_data.get('loadbalancer')
while True:
loadbalancer_ready = True
if loadbalancer_id:
lb = self.nc.loadbalancer_get(loadbalancer_id)
if lb is None:
loadbalancer_ready = True if ignore_not_found else False
elif (lb.provisioning_status == 'ACTIVE') and (
lb.operating_status == 'ONLINE'):
loadbalancer_ready = True
else:
loadbalancer_ready = False
if loadbalancer_ready:
return True, lb_data
waited = 0
while waited < timeout:
lb = self.nc().loadbalancer_get(lb_id)
if lb is None:
lb_ready = ignore_not_found
else:
LOG.debug(_('Waiting for loadbalancer %(lb)s becoming ready'
) % {'lb': loadbalancer_id})
lb_ready = ((lb.provisioning_status == 'ACTIVE') and
(lb.operating_status == 'ONLINE'))
eventlet.sleep(2)
if lb_ready is True:
return True
# TODO(Yanyan Hu): Add timeout check.
LOG.debug(_('Waiting for loadbalancer %(lb)s to become ready'),
{'lb': lb_id})
eventlet.sleep(2)
waited += 2
return False
def lb_create(self, vip, pool):
"""Create a Neutron lbaas instance"""
"""Create a LBaaS instance
# Catch all exceptions that could happen in each step and gracefully
# removing all lbaas related resources that have been created before
# returning.
try:
# Create loadblancer
lb_data = {}
subnet = vip.get('subnet', None)
subnet_id = (self.nc.subnet_get(subnet)).id
address = vip.get('address', None)
admin_state_up = vip.get('admin_state_up', None)
lb = self.nc.loadbalancer_create(subnet_id, address,
admin_state_up)
lb_data['loadbalancer'] = lb.id
res, reason = self._wait_for_lb_ready(lb_data)
if res is not True:
return res, reason
# Create listener
protocol = vip.get('protocol')
protocol_port = vip.get('protocol_port')
connection_limit = vip.get('connection_limit', None)
listener = self.nc.listener_create(lb.id, protocol, protocol_port,
connection_limit,
admin_state_up)
lb_data['listener'] = listener.id
res, reason = self._wait_for_lb_ready(lb_data)
if res is not True:
return res, reason
# Create pool
lb_algorithm = pool.get('lb_method')
protocol = pool.get('protocol')
admin_state_up = pool.get('admin_state_up')
pool = self.nc.pool_create(lb_algorithm, listener.id, protocol,
admin_state_up)
lb_data['pool'] = pool.id
res, reason = self._wait_for_lb_ready(lb_data)
if res is not True:
return res, reason
except Exception as ex:
self.lb_delete(**lb_data)
msg = _('Failed in creating lb resources: %(ex)s '
) % {'ex': six.text_type(ex)}
:param vip: A dict containing the properties for the VIP;
:param pool: A dict describing the pool of load-balancer members.
"""
def _cleanup(msg, **kwargs):
LOG.error(msg)
self.lb_delete(**kwargs)
return
result = {}
# Create loadblancer
lb = self.nc().loadbalancer_create(vip['subnet'],
vip.get('address', None),
vip['admin_state_up'])
result['loadbalancer'] = lb.id
res = self._wait_for_lb_ready(lb.id)
if res is False:
msg = _LE('Failed in creating load balancer (%s).') % lb.id
_cleanup(msg, **result)
return False, msg
return True, lb_data
# Create listener
listener = self.nc().listener_create(lb.id, vip['protocol'],
vip['protocol_port'],
vip.get('connection_limit', None),
vip['admin_state_up'])
result['listener'] = listener.id
res = self._wait_for_lb_ready(lb.id)
if res is False:
msg = _LE('Failed in creating listener (%s).') % listener.id
_cleanup(msg, **result)
return res, msg
# Create pool
pool = self.nc().pool_create(pool['lb_method'], listener.id,
pool['protocol'], pool['admin_state_up'])
result['pool'] = pool.id
res = self._wait_for_lb_ready(lb.id)
if res is False:
msg = _LE('Failed in creating pool (%s).') % pool.id
_cleanup(msg, **result)
return res, msg
return True, result
def lb_delete(self, **kwargs):
"""Delete a Neutron lbaas instance
@ -125,100 +125,75 @@ class LoadBalancerDriver(base.DriverBase):
The following Neutron lbaas resources will be deleted in order:
1)healthmonitor; 2)pool; 3)listener; 4)loadbalancer.
"""
lb_id = kwargs.pop('loadbalancer')
loadbalancer_id = kwargs.get('loadbalancer')
listener_id = kwargs.get('listener')
pool_id = kwargs.get('pool')
healthmonitor_id = kwargs.get('healthmonitor', None)
lb_data = kwargs
healthmonitor_id = kwargs.pop('healthmonitor', None)
if healthmonitor_id:
self.nc().healthmonitor_delete(healthmonitor_id)
self._wait_for_lb_ready(lb_id)
pool_id = kwargs.pop('pool', None)
if pool_id:
self.nc().pool_delete(pool_id)
self._wait_for_lb_ready(lb_id)
listener_id = kwargs.pop('listener', None)
if listener_id:
self.nc().listener_delete(listener_id)
self._wait_for_lb_ready(lb_id)
self.nc().loadbalancer_delete(lb_id)
self._wait_for_lb_ready(lb_id, True)
return True, _('LB deletion succeeded')
def member_add(self, node, lb_id, pool_id, port, subnet):
"""Add a member to Neutron lbaas pool.
:param node: A node object to be added to the specified pool.
:param lb_id: The ID of the loadbalancer.
:param pool_id: The ID of the pool for receiving the node.
:param port: The port for the new LB member to be created.
:param subnet: The subnet to be used by the new LB member.
:returns: The ID of the new LB member or None if errors occurred.
"""
addresses = self._get_node_address(node, version=4)
if not addresses:
LOG.error(_LE('Node (%(n)s) does not have valid IPv4 address.'),
{'n': node.id})
return None
subnet_obj = self.nc().subnet_get(subnet)
net_id = subnet_obj['network_id']
net = self.nc().network_get(net_id)
net_name = net['name']
if net_name not in addresses:
LOG.error(_LE('Node is not in subnet %(subnet)s'),
{'subnet': subnet})
return None
address = addresses[net_name]
member = self.nc().pool_member_create(pool_id, address, port, subnet)
self._wait_for_lb_ready(lb_id)
return member.id
def member_remove(self, lb_id, pool_id, member_id):
"""Delete a member from Neutron lbaas pool.
:param lb_id: The ID of the loadbalancer the operation is targeted at;
:param pool_id: The ID of the pool from which the member is deleted;
:param member_id: The ID of the LB member.
:returns: True if the operation succeeded or False if errors occurred.
"""
try:
if healthmonitor_id is not None:
self.nc.healthmonitor_delete(healthmonitor_id)
del lb_data['healthmonitor']
self._wait_for_lb_ready(lb_data, ignore_not_found=True)
self.nc.pool_delete(pool_id)
del lb_data['pool']
self._wait_for_lb_ready(lb_data, ignore_not_found=True)
self.nc.listener_delete(listener_id)
del lb_data['listener']
self._wait_for_lb_ready(lb_data, ignore_not_found=True)
self.nc.loadbalancer_delete(loadbalancer_id)
self.nc().pool_member_delete(pool_id, member_id)
self._wait_for_lb_ready(lb_id)
except Exception as ex:
msg = _('Failed in deleting lb resources %(data)s: %(ex)s'
) % {'data': lb_data, 'ex': six.text_type(ex)}
LOG.error(msg)
return False, msg
return True, 'lb resource deleting succeeded'
def member_add(self, **kwargs):
"""Add a member to Neutron lbaas pool"""
node = kwargs.get('node')
pool_id = kwargs.get('pool_id')
port = kwargs.get('port')
subnet = kwargs.get('subnet')
try:
addresses = self._get_node_address(node, version=4)
if not addresses:
msg = _('Node does not have valid IPv%(version)s address'
) % {'version': 4}
raise Exception(msg)
else:
network_id = (self.nc.subnet_get(subnet))['network_id']
network_name = (self.nc.network_get(network_id))['name']
if network_name in addresses:
address = addresses[network_name]
else:
msg = _('Node is not in subnet %(subnet)s'
) % {'subnet': subnet}
raise Exception(msg)
subnet_id = (self.nc.subnet_get(subnet)).id
pool_member = self.nc.pool_member_create(pool_id, address, port,
subnet_id)
pool = self.nc.pool_get(pool_id)
listener = self.nc.listener_get(pool.listeners[0]['id'])
lb_data = {
'loadbalancer': listener.loadbalancers[0]['id'],
'member': pool_member.id
}
self._wait_for_lb_ready(lb_data)
except Exception as ex:
msg = _('Failed in adding node %(node)s into pool %(pool)s as '
'a member: %(ex)s') % {'node': node.id, 'pool': pool_id,
'ex': six.text_type(ex)}
LOG.error(msg)
return False
return pool_member.id
def member_remove(self, **kwargs):
"""Delete a member from Neutron lbaas pool"""
pool_id = kwargs.get('pool_id')
member_id = kwargs.get('member_id')
try:
self.nc.pool_member_delete(pool_id, member_id)
pool = self.nc.pool_get(pool_id)
listener = self.nc.listener_get(pool.listeners[0]['id'])
lb_data = {
'loadbalancer': listener.loadbalancers[0]['id'],
}
self._wait_for_lb_ready(lb_data)
except Exception as ex:
msg = _('Failed in removing member %(member)s from pool %(pool): '
'%(ex)s') % {'member': member_id, 'pool': pool_id,
'ex': six.test_type(ex)}
LOG.error(msg)
LOG.error(_LE('Failed in removing member %(m)s from pool %(p)s: '
'%(ex)s'),
{'m': member_id, 'p': pool_id, 'ex': six.text_type(ex)})
return False
return True
@ -226,7 +201,7 @@ class LoadBalancerDriver(base.DriverBase):
def _get_node_address(self, node, version=4):
"""Get IP address of node with specific version"""
node_detail = node.get_details(self.context)
node_detail = node.get_details(context.get_current())
node_addresses = node_detail.get('addresses')
address = {}

View File

@ -131,11 +131,11 @@ class Policy(object):
'''Validate the schema and the data provided.'''
self.spec_data.validate()
def attach(self, cluster_id, action):
def attach(self, cluster_id):
'''Method to be invoked before policy is attached to a cluster.'''
return True
def detach(self, cluster_id, action):
def detach(self, cluster_id):
'''Method to be invoked before policy is detached from a cluster.'''
return True

View File

@ -12,6 +12,7 @@
from senlin.common import constraints
from senlin.common import consts
from senlin.common import context
from senlin.common.i18n import _
from senlin.common import schema
from senlin.engine import cluster as cluster_mod
@ -119,24 +120,24 @@ class HealthPolicy(base.Policy):
self.check_type = self.spec_data[self.DETECTION][self.DETECTION_TYPE]
self.interval = self.spec_data[self.DETECTION][self.CHECK_INTERVAL]
def attach(self, cluster_id, action):
def attach(self, cluster_id):
'''Hook for policy attach.
Initialize the health check mechanism for existing nodes in cluster.
'''
cluster = cluster_mod.Cluster.load(action.context,
cluster = cluster_mod.Cluster.load(context.get_current(),
cluster_id=cluster_id)
cluster.healthy_check_enable()
cluster.healthy_check_set_interval(self.interval)
return True
def detach(self, cluster_id, action):
def detach(self, cluster_id):
'''Hook for policy detach.
Deinitialize the health check mechanism (for the cluster).
'''
cluster = cluster_mod.Cluster.load(action.context,
cluster = cluster_mod.Cluster.load(context.get_current(),
cluster_id=cluster_id)
cluster.healthy_check_disable()
return True

View File

@ -10,12 +10,17 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_context import context as oslo_context
from oslo_log import log as logging
from senlin.common import constraints
from senlin.common import consts
from senlin.common import context
from senlin.common import exception
from senlin.common.i18n import _
from senlin.common.i18n import _LW
from senlin.common import schema
from senlin.db import api as db_api
from senlin.drivers.openstack import lbaas
from senlin.engine import cluster_policy
from senlin.engine import node as node_mod
@ -51,10 +56,10 @@ class LoadBalancingPolicy(base.Policy):
)
_POOL_KEYS = (
POOL_ID, POOL_PROTOCOL, POOL_PROTOCOL_PORT, POOL_SUBNET,
POOL_PROTOCOL, POOL_PROTOCOL_PORT, POOL_SUBNET,
POOL_LB_METHOD, POOL_ADMIN_STATE_UP, POOL_SESSION_PERSISTENCE,
) = (
'id', 'protocol', 'protocol_port', 'subnet',
'protocol', 'protocol_port', 'subnet',
'lb_method', 'admin_state_up', 'session_persistence',
)
@ -94,9 +99,6 @@ class LoadBalancingPolicy(base.Policy):
POOL: schema.Map(
_('LB pool properties.'),
schema={
POOL_ID: schema.String(
_('ID of an existing load-balanced pool.'),
),
POOL_PROTOCOL: schema.String(
_('Protocol used for load balancing.'),
constraints=[
@ -154,6 +156,7 @@ class LoadBalancingPolicy(base.Policy):
),
VIP_ADDRESS: schema.String(
_('IP address of the VIP.'),
default=None,
),
VIP_CONNECTION_LIMIT: schema.Integer(
_('Maximum number of connections per second allowed for '
@ -185,74 +188,98 @@ class LoadBalancingPolicy(base.Policy):
self.validate()
self.lb = None
def attach(self, cluster_id, action):
self.action = action
pool_id = self.pool_spec.get(self.POOL_ID, None)
if pool_id is not None:
data = {
'pool': self.pool_id,
'pool_need_delete': False
}
else:
res, data = self.lb_driver.lb_create(self.vip_spec,
self.pool_spec)
if res is not True:
return res, data
else:
data['pool_need_delete'] = True
def validate(self):
super(LoadBalancingPolicy, self).validate()
# validate subnet's exists
# subnet = self.nc.subnet_get(vip[self.VIP_SUBNET])
def attach(self, cluster_id):
"""Routine to be invoked when policy is to be attached to a cluster.
:param cluster_id: The ID of the target cluster to be attached to;
:param action: The action that triggered this operation.
:returns: When the operation was successful, returns a tuple of
(True, data) where the data contains references to the resources
created; otherwise returns a tuple of (False, err) where the err
contains a error message.
"""
ctx = self._build_context(cluster_id)
lb_driver = lbaas.LoadBalancerDriver(ctx)
# TODO(Anyone): check cluster profile type matches self.PROFILE or not
res, data = lb_driver.lb_create(self.vip_spec, self.pool_spec)
if res is False:
return res, data
port = self.pool_spec.get(self.POOL_PROTOCOL_PORT)
subnet = self.pool_spec.get(self.POOL_SUBNET)
nodes = node_mod.Node.load_all(action.context, cluster_id=cluster_id)
nodes = node_mod.Node.load_all(context.get_current(),
cluster_id=cluster_id)
for node in nodes:
params = {
'pool_id': data['pool'],
'node': node,
'port': port,
'subnet': subnet
}
member_id = self.lb_driver.member_add(**params)
member_id = lb_driver.member_add(node, data['loadbalancer'],
data['pool'], port, subnet)
if member_id is None:
# Adding member failed, remove all lb resources that
# have been created and return failure reason.
# When failed in adding member, remove all lb resources that
# were created and return the failure reason.
# TODO(Yanyan Hu): Maybe we should tolerate member adding
# failure and allow policy attaching to succeed without
# all nodes being added into lb pool?
self.lb_driver.lb_delete(**data)
return False, 'Failed in adding existed node into lb pool'
else:
node.data.update({'lb_member': member_id})
node.store(action.context)
lb_driver.lb_delete(**data)
return False, 'Failed in adding node into lb pool'
node.data.update({'lb_member': member_id})
node.store(context.get_current())
return True, data
def detach(self, cluster_id, action):
res = True
self.action = action
cp = cluster_policy.ClusterPolicy.load(action.context, cluster_id,
self.id)
def detach(self, cluster_id):
"""Routine to be called when the policy is detached from a cluster.
if cp.data['pool_need_delete']:
res, reason = self.lb_driver.lb_delete(**cp.data)
:param cluster_id: The ID of the cluster from which the policy is to
be detached.
:returns: When the operation was successful, returns a tuple of
(True, data) where the data contains references to the resources
created; otherwise returns a tuple of (False, err) where the err
contains a error message.
"""
ctx = self._build_context(cluster_id)
lb_driver = lbaas.LoadBalancerDriver(ctx)
if res is not True:
return res, reason
else:
return res, 'lb resources deleting succeeded'
cp = cluster_policy.ClusterPolicy.load(context.get_current(),
cluster_id, self.id)
res, reason = lb_driver.lb_delete(**cp.data)
if res is False:
return False, reason
return True, 'LB resources deletion succeeded.'
def post_op(self, cluster_id, action):
"""Add new created node(s) to lb pool"""
"""Routine to be called after an action has been executed.
For this particular policy, we take this chance to update the pool
maintained by the load-balancer.
:param cluster_id: The ID of the cluster on which a relevant action
has been executed.
:param action: The action object that triggered this operation.
:returns: Nothing.
"""
ctx = self._build_context(cluster_id)
lb_driver = lbaas.LoadBalancerDriver(ctx)
self.action = action
cp = cluster_policy.ClusterPolicy.load(action.context, cluster_id,
self.id)
lb_id = cp.data['loadbalancer']
pool_id = cp.data['pool']
port = self.pool_spec.get(self.POOL_PROTOCOL_PORT)
subnet = self.pool_spec.get(self.POOL_SUBNET)
nodes = action.data.get('nodes')
if nodes is None:
nodes = action.data.get('nodes', [])
if len(nodes) == 0:
return
for node_id in nodes:
@ -263,61 +290,64 @@ class LoadBalancingPolicy(base.Policy):
consts.CLUSTER_SCALE_IN))\
or (action.action == consts.CLUSTER_RESIZE and
action.data.get('deletion')):
if member_id:
# Remove nodes that have been deleted from lb pool
params = {
'pool_id': pool_id,
'member_id': member_id,
}
res = self.lb_driver.member_remove(**params)
if res is not True:
action.data['status'] = base.CHECK_ERROR
action.data['reason'] = _('Failed in removing deleted '
'node from lb pool')
return
else:
msg = _('Node %(node)s is not in loadbalancer pool '
'%(pool)s when being deleted from cluster '
'%(cluster)s.') % {'node': node_id,
'pool': pool_id,
'cluster': node.cluster_id}
LOG.warning(msg)
if member_id is None:
LOG.warning(_LW('Node %(node)s not found in loadbalancer '
'pool %(pool)s.'),
{'node': node_id, 'pool': pool_id})
return
# Remove nodes that have been deleted from lb pool
res = lb_driver.member_remove(lb_id, pool_id, member_id)
if res is not True:
action.data['status'] = base.CHECK_ERROR
action.data['reason'] = _('Failed in removing deleted '
'node from lb pool')
return
if (action.action in (consts.CLUSTER_ADD_NODES,
consts.CLUSTER_SCALE_OUT))\
or (action.action == consts.CLUSTER_RESIZE and
action.data.get('creation')):
if member_id is None:
# Add new created nodes into lb pool
params = {
'pool_id': pool_id,
'node': node,
'port': port,
'subnet': subnet
}
member_id = self.lb_driver.member_add(**params)
if member_id is None:
action.data['status'] = base.CHECK_ERROR
action.data['reason'] = _('Failed in adding new node '
'into lb pool')
return
node.data.update({'lb_member': member_id})
node.store(action.context)
else:
msg = _('Node %(node)s has been in a loadbalancer pool as'
'member %(member)s before being added to cluster '
'%(cluster)s.') % {'node': node_id,
'member': member_id,
'cluster': node.cluster_id}
LOG.warning(msg)
if member_id:
LOG.warning(_LW('Node %(node)s already in loadbalancer '
'pool %(pool)s.'),
{'node': node_id, 'pool': pool_id})
return
res = lb_driver.member_add(node, lb_id, pool_id, port, subnet)
if res is None:
action.data['status'] = base.CHECK_ERROR
action.data['reason'] = _('Failed in adding new node '
'into lb pool')
return
node.data.update({'lb_member': res})
node.store(action.context)
return
@property
def lb_driver(self):
if self.lb is None:
self.lb = lbaas.LoadBalancerDriver(self.action.context)
return self.lb
else:
return self.lb
def _build_context(self, cluster_id):
"""Build a trust-based context for connection parameters.
:param cluster_id: the ID of the cluste for which the trust will be
checked.
"""
ctx = context.get_service_context()
params = {
'auth_url': ctx['auth_url'],
'user_name': ctx['user_name'],
'user_domain_name': ctx['user_domain_name'],
'password': ctx['password'],
}
cluster = db_api.cluster_get(oslo_context.get_current(), cluster_id)
cred = db_api.cred_get(oslo_context.get_current(),
cluster.user, cluster.project)
if cred is None:
raise exception.TrustNotFound(trustor=cluster.user)
params['trusts'] = cred.cred['openstack']['trust']
params['project_id'] = cluster.project
return context.from_dict(params)