[API] Active monitoring support
Added all API related code to support Active health monitoring as defined in the Atlas-LB specification. Change-Id: If5e468e6171ad7e1d4da78d004ae6cde5f0824e6
This commit is contained in:
@@ -13,27 +13,67 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from pecan import response
|
||||
from pecan import request
|
||||
from pecan.rest import RestController
|
||||
import wsmeext.pecan as wsme_pecan
|
||||
from wsme.exc import ClientSideError
|
||||
from wsme import Unset
|
||||
from libra.api.model.lbaas import LoadBalancer, db_session
|
||||
from libra.api.model.lbaas import Device, HealthMonitor
|
||||
from libra.api.acl import get_limited_to_project
|
||||
from libra.api.model.validators import LBMonitorPut, LBMonitorResp
|
||||
from libra.api.library.gearman_client import submit_job
|
||||
from libra.api.library.exp import NotFound
|
||||
|
||||
|
||||
class HealthMonitorController(RestController):
|
||||
"""functions for /loadbalancers/{loadBalancerId}/healthmonitor/* routing"""
|
||||
"""functions for /loadbalancers/{loadBalancerId}/healthmonitor routing"""
|
||||
def __init__(self, load_balancer_id=None):
|
||||
self.lbid = load_balancer_id
|
||||
|
||||
def get(self, load_balancer_id):
|
||||
@wsme_pecan.wsexpose(None)
|
||||
def get(self):
|
||||
"""Retrieve the health monitor configuration, if one exists.
|
||||
|
||||
:param load_balancer_id: id of lb
|
||||
|
||||
Url:
|
||||
GET /loadbalancers/{load_balancer_id}/healthmonitor
|
||||
|
||||
Returns: dict
|
||||
"""
|
||||
response.status = 201
|
||||
return None
|
||||
|
||||
def post(self, load_balancer_id, *args):
|
||||
if not self.lbid:
|
||||
raise ClientSideError('Load Balancer ID has not been supplied')
|
||||
|
||||
tenant_id = get_limited_to_project(request.headers)
|
||||
with db_session() as session:
|
||||
# grab the lb
|
||||
lb, monitor = session.query(LoadBalancer, HealthMonitor).\
|
||||
outerjoin(LoadBalancer.monitors).\
|
||||
filter(LoadBalancer.id == self.lbid).\
|
||||
filter(LoadBalancer.tenantid == tenant_id).\
|
||||
filter(LoadBalancer.status != 'DELETED').first()
|
||||
|
||||
if lb is None:
|
||||
session.rollback()
|
||||
raise NotFound('Load Balancer ID is not valid')
|
||||
|
||||
if monitor is None:
|
||||
session.rollback()
|
||||
return {}
|
||||
|
||||
monitor_data = {
|
||||
'type': monitor.type,
|
||||
'delay': monitor.delay,
|
||||
'timeout': monitor.timeout,
|
||||
'attemptsBeforeDeactivation': monitor.attempts
|
||||
}
|
||||
|
||||
if monitor.path:
|
||||
monitor_data['path'] = monitor.path
|
||||
|
||||
return monitor_data
|
||||
|
||||
@wsme_pecan.wsexpose(LBMonitorResp, body=LBMonitorPut, status_code=202)
|
||||
def put(self, body=None):
|
||||
"""Update the settings for a health monitor.
|
||||
|
||||
:param load_balancer_id: id of lb
|
||||
@@ -44,10 +84,119 @@ class HealthMonitorController(RestController):
|
||||
|
||||
Returns: dict
|
||||
"""
|
||||
response.status = 201
|
||||
return None
|
||||
if not self.lbid:
|
||||
raise ClientSideError('Load Balancer ID has not been supplied')
|
||||
|
||||
def delete(self, load_balancer_id):
|
||||
tenant_id = get_limited_to_project(request.headers)
|
||||
with db_session() as session:
|
||||
# grab the lb
|
||||
lb, monitor = session.query(LoadBalancer, HealthMonitor).\
|
||||
outerjoin(LoadBalancer.monitors).\
|
||||
filter(LoadBalancer.id == self.lbid).\
|
||||
filter(LoadBalancer.tenantid == tenant_id).\
|
||||
filter(LoadBalancer.status != 'DELETED').first()
|
||||
|
||||
if lb is None:
|
||||
session.rollback()
|
||||
raise NotFound('Load Balancer ID is not valid')
|
||||
|
||||
# Check inputs
|
||||
if (
|
||||
body.type == Unset or
|
||||
body.delay == Unset or
|
||||
body.timeout == Unset or
|
||||
body.attemptsBeforeDeactivation == Unset
|
||||
):
|
||||
session.rollback()
|
||||
raise ClientSideError(
|
||||
"Missing field(s): {0}, {1}, {2}, and {3} are required"
|
||||
.format("type", "delay", "timeout",
|
||||
"attemptsBeforeDeactivation")
|
||||
)
|
||||
|
||||
data = {
|
||||
"lbid": self.lbid,
|
||||
"type": body.type,
|
||||
"delay": int(body.delay),
|
||||
"timeout": int(body.timeout),
|
||||
"attempts": int(body.attemptsBeforeDeactivation)
|
||||
}
|
||||
|
||||
# Path only required when type is not CONNECT
|
||||
if body.path != Unset and body.path is not None:
|
||||
if body.type == "CONNECT":
|
||||
session.rollback()
|
||||
raise ClientSideError(
|
||||
"Path argument is invalid with CONNECT type"
|
||||
)
|
||||
data["path"] = body.path
|
||||
# If path is empty, set to /
|
||||
if len(data["path"]) == 0 or data["path"][0] != "/":
|
||||
session.rollback()
|
||||
raise ClientSideError(
|
||||
"Path must begin with leading /"
|
||||
)
|
||||
else:
|
||||
if body.type != "CONNECT":
|
||||
session.rollback()
|
||||
raise ClientSideError(
|
||||
"Path argument is required"
|
||||
)
|
||||
data["path"] = None
|
||||
|
||||
if data["timeout"] > data["delay"]:
|
||||
session.rollback()
|
||||
raise ClientSideError(
|
||||
"timeout cannot be greater than delay"
|
||||
)
|
||||
|
||||
if (data["attempts"] < 1 or data["attempts"] > 10):
|
||||
session.rollback()
|
||||
raise ClientSideError(
|
||||
"attemptsBeforeDeactivation must be between 1 and 10"
|
||||
)
|
||||
|
||||
if monitor is None:
|
||||
# This is ok for LBs that already existed without
|
||||
# monitoring. Create a new entry.
|
||||
monitor = HealthMonitor(
|
||||
lbid=self.lbid, type=data["type"], delay=data["delay"],
|
||||
timeout=data["timeout"], attempts=data["attempts"],
|
||||
path=data["path"]
|
||||
)
|
||||
session.add(monitor)
|
||||
else:
|
||||
# Modify the existing entry.
|
||||
monitor.type = data["type"]
|
||||
monitor.delay = data["delay"]
|
||||
monitor.timeout = data["timeout"]
|
||||
monitor.attempts = data["attempts"]
|
||||
monitor.path = data["path"]
|
||||
|
||||
lb.status = 'PENDING_UPDATE'
|
||||
device = session.query(
|
||||
Device.id, Device.name
|
||||
).join(LoadBalancer.devices).\
|
||||
filter(LoadBalancer.id == self.lbid).\
|
||||
first()
|
||||
|
||||
return_data = LBMonitorResp()
|
||||
return_data.type = data["type"]
|
||||
return_data.delay = str(data["delay"])
|
||||
return_data.timeout = str(data["timeout"])
|
||||
return_data.attemptsBeforeDeactivation =\
|
||||
str(data["attempts"])
|
||||
if ((data["path"] is not None) and (len(data["path"]) > 0)):
|
||||
return_data.path = data["path"]
|
||||
|
||||
session.commit()
|
||||
submit_job(
|
||||
'UPDATE', device.name, device.id, lb.id
|
||||
)
|
||||
return return_data
|
||||
|
||||
@wsme_pecan.wsexpose(None, status_code=202)
|
||||
def delete(self):
|
||||
"""Remove the health monitor.
|
||||
|
||||
:param load_balancer_id: id of lb
|
||||
@@ -57,4 +206,34 @@ class HealthMonitorController(RestController):
|
||||
|
||||
Returns: void
|
||||
"""
|
||||
response.status = 201
|
||||
|
||||
if not self.lbid:
|
||||
raise ClientSideError('Load Balancer ID has not been supplied')
|
||||
|
||||
tenant_id = get_limited_to_project(request.headers)
|
||||
with db_session() as session:
|
||||
load_balancer, monitor = session.query(
|
||||
LoadBalancer, HealthMonitor
|
||||
).outerjoin(LoadBalancer.monitors).\
|
||||
filter(LoadBalancer.tenantid == tenant_id).\
|
||||
filter(LoadBalancer.id == self.lbid).\
|
||||
filter(LoadBalancer.status != 'DELETED').\
|
||||
first()
|
||||
if load_balancer is None:
|
||||
session.rollback()
|
||||
raise NotFound("Load Balancer not found")
|
||||
|
||||
if monitor is not None:
|
||||
session.delete(monitor)
|
||||
session.flush()
|
||||
|
||||
device = session.query(
|
||||
Device.id, Device.name
|
||||
).join(LoadBalancer.devices).\
|
||||
filter(LoadBalancer.id == self.lbid).\
|
||||
first()
|
||||
session.commit()
|
||||
submit_job(
|
||||
'UPDATE', device.name, device.id, self.lbid
|
||||
)
|
||||
return None
|
||||
|
||||
@@ -21,7 +21,9 @@ from wsme import Unset
|
||||
# other controllers
|
||||
from nodes import NodesController
|
||||
from virtualips import VipsController
|
||||
from health_monitor import HealthMonitorController
|
||||
from logs import LogsController
|
||||
|
||||
# models
|
||||
from libra.api.model.lbaas import LoadBalancer, Device, Node, db_session
|
||||
from libra.api.model.lbaas import loadbalancers_devices, Limits
|
||||
@@ -319,7 +321,6 @@ class LoadBalancersController(RestController):
|
||||
# now save the loadbalancer_id to the device and switch its status
|
||||
# to online
|
||||
device.status = "ONLINE"
|
||||
|
||||
session.flush()
|
||||
|
||||
return_data = LBResp()
|
||||
@@ -463,6 +464,8 @@ class LoadBalancersController(RestController):
|
||||
return VipsController(lbid), remainder[1:]
|
||||
if remainder[0] == 'logs':
|
||||
return LogsController(lbid), remainder[1:]
|
||||
if remainder[0] == 'healthmonitor':
|
||||
return HealthMonitorController(lbid), remainder[1:]
|
||||
|
||||
# Kludgy fix for PUT since WSME doesn't like IDs on the path
|
||||
elif lbid:
|
||||
|
||||
@@ -17,6 +17,7 @@ eventlet.monkey_patch()
|
||||
import logging
|
||||
from libra.common.json_gearman import JSONGearmanClient
|
||||
from libra.api.model.lbaas import LoadBalancer, db_session, Device, Node
|
||||
from libra.api.model.lbaas import HealthMonitor
|
||||
from libra.api.model.lbaas import loadbalancers_devices
|
||||
from sqlalchemy.exc import OperationalError
|
||||
from pecan import conf
|
||||
@@ -153,6 +154,8 @@ class GearmanClientThread(object):
|
||||
))
|
||||
session.query(Node).\
|
||||
filter(Node.lbid == lb.id).delete()
|
||||
session.query(HealthMonitor).\
|
||||
filter(HealthMonitor.lbid == lb.id).delete()
|
||||
session.commit()
|
||||
|
||||
def _set_error(self, device_id, errmsg, session):
|
||||
@@ -218,7 +221,8 @@ class GearmanClientThread(object):
|
||||
'protocol': lb.protocol,
|
||||
'algorithm': lb.algorithm,
|
||||
'port': lb.port,
|
||||
'nodes': []
|
||||
'nodes': [],
|
||||
'monitor': {}
|
||||
}
|
||||
for node in lb.nodes:
|
||||
if not node.enabled:
|
||||
@@ -230,7 +234,33 @@ class GearmanClientThread(object):
|
||||
'condition': condition
|
||||
}
|
||||
lb_data['nodes'].append(node_data)
|
||||
|
||||
# Add a default health monitor if one does not exist
|
||||
monitor = session.query(HealthMonitor).\
|
||||
filter(HealthMonitor.lbid == lb.id).first()
|
||||
|
||||
if monitor is None:
|
||||
# Set it to a default configuration
|
||||
monitor = HealthMonitor(
|
||||
lbid=lb.id, type="CONNECT", delay=30,
|
||||
timeout=30, attempts=2, path=None
|
||||
)
|
||||
session.add(monitor)
|
||||
session.flush()
|
||||
|
||||
monitor_data = {
|
||||
'type': monitor.type,
|
||||
'delay': monitor.delay,
|
||||
'timeout': monitor.timeout,
|
||||
'attempts': monitor.attempts
|
||||
}
|
||||
if monitor.path is not None:
|
||||
monitor_data['path'] = monitor.path
|
||||
|
||||
lb_data['monitor'] = monitor_data
|
||||
job_data['loadBalancers'].append(lb_data)
|
||||
|
||||
# Update the worker
|
||||
status, response = self._send_message(job_data)
|
||||
lb = session.query(LoadBalancer).\
|
||||
filter(LoadBalancer.id == self.lbid).\
|
||||
|
||||
@@ -87,6 +87,11 @@ class LoadBalancer(DeclarativeBase):
|
||||
nodes = relationship(
|
||||
'Node', backref=backref('loadbalancers', order_by='Node.id')
|
||||
)
|
||||
monitors = relationship(
|
||||
'HealthMonitor', backref=backref(
|
||||
'loadbalancers',
|
||||
order_by='HealthMonitor.lbid')
|
||||
)
|
||||
devices = relationship(
|
||||
'Device', secondary=loadbalancers_devices, backref='loadbalancers',
|
||||
lazy='joined'
|
||||
@@ -108,6 +113,23 @@ class Node(DeclarativeBase):
|
||||
weight = Column(u'weight', INTEGER(), nullable=False)
|
||||
|
||||
|
||||
class HealthMonitor(DeclarativeBase):
|
||||
"""monitors model"""
|
||||
__tablename__ = 'monitors'
|
||||
#column definitions
|
||||
lbid = Column(
|
||||
u'lbid', BIGINT(), ForeignKey('loadbalancers.id'), primary_key=True,
|
||||
nullable=False
|
||||
)
|
||||
type = Column(u'type', VARCHAR(length=128), nullable=False)
|
||||
delay = Column(u'delay', INTEGER(), nullable=False)
|
||||
timeout = Column(u'timeout', INTEGER(), nullable=False)
|
||||
attempts = Column(
|
||||
u'attemptsBeforeDeactivation', INTEGER(), nullable=False
|
||||
)
|
||||
path = Column(u'path', VARCHAR(length=2000))
|
||||
|
||||
|
||||
class RoutingSession(Session):
|
||||
""" If an engine is already in use, re-use it. Otherwise we can end up
|
||||
with deadlocks in Galera, see http://tinyurl.com/9h6qlly
|
||||
|
||||
@@ -64,3 +64,15 @@ CREATE TABLE `loadbalancers_devices` (
|
||||
`device` int(11) DEFAULT NULL,
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=latin1
|
||||
|
||||
CREATE TABLE monitors (
|
||||
lbid BIGINT NOT NULL, # Loadbalancer who owns this node
|
||||
type VARCHAR(128) NOT NULL, # Type of ping. CONNECT, HTTP, HTTPS
|
||||
delay INT NOT NULL, # This is the minimum time in seconds between regular calls to a monitor
|
||||
timeout INT NOT NULL, # Maximum number of seconds to wait for a connection to the node before it times out.
|
||||
attemptsBeforeDeactivation INT NOT NULL, # Number of permissible failures before removing a node from rotation. 1 to 10.
|
||||
path VARCHAR(2000) NULL, # The HTTP path used in the request by the monitor. Begins with /
|
||||
PRIMARY KEY (lbid) # ids are unique accross all Nodes
|
||||
) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci;
|
||||
|
||||
|
||||
|
||||
@@ -93,3 +93,19 @@ class LBResp(Base):
|
||||
updated = wtypes.text
|
||||
virtualIps = wsattr(['LBVipResp'])
|
||||
nodes = wsattr(['LBRespNode'])
|
||||
|
||||
|
||||
class LBMonitorPut(Base):
|
||||
type = Enum(wtypes.text, 'CONNECT', 'HTTP')
|
||||
delay = int
|
||||
timeout = int
|
||||
attemptsBeforeDeactivation = int
|
||||
path = wtypes.text
|
||||
|
||||
|
||||
class LBMonitorResp(Base):
|
||||
type = wtypes.text
|
||||
delay = wtypes.text
|
||||
timeout = wtypes.text
|
||||
attemptsBeforeDeactivation = wtypes.text
|
||||
path = wtypes.text
|
||||
|
||||
Reference in New Issue
Block a user