From 0a99398230a7b0187bf0a5b8423112d1d0559fed Mon Sep 17 00:00:00 2001 From: marcrp Date: Thu, 22 Aug 2013 13:08:30 -0400 Subject: [PATCH] [API] Active monitoring support Added all API related code to support Active health monitoring as defined in the Atlas-LB specification. Change-Id: If5e468e6171ad7e1d4da78d004ae6cde5f0824e6 --- libra/api/controllers/health_monitor.py | 205 ++++++++++++++++++++++-- libra/api/controllers/load_balancers.py | 5 +- libra/api/library/gearman_client.py | 32 +++- libra/api/model/lbaas.py | 22 +++ libra/api/model/lbaas.sql | 12 ++ libra/api/model/validators.py | 16 ++ 6 files changed, 277 insertions(+), 15 deletions(-) diff --git a/libra/api/controllers/health_monitor.py b/libra/api/controllers/health_monitor.py index 740b3c8e..c5594c3d 100644 --- a/libra/api/controllers/health_monitor.py +++ b/libra/api/controllers/health_monitor.py @@ -13,27 +13,67 @@ # License for the specific language governing permissions and limitations # under the License. -from pecan import response +from pecan import request from pecan.rest import RestController +import wsmeext.pecan as wsme_pecan +from wsme.exc import ClientSideError +from wsme import Unset +from libra.api.model.lbaas import LoadBalancer, db_session +from libra.api.model.lbaas import Device, HealthMonitor +from libra.api.acl import get_limited_to_project +from libra.api.model.validators import LBMonitorPut, LBMonitorResp +from libra.api.library.gearman_client import submit_job +from libra.api.library.exp import NotFound class HealthMonitorController(RestController): - """functions for /loadbalancers/{loadBalancerId}/healthmonitor/* routing""" + """functions for /loadbalancers/{loadBalancerId}/healthmonitor routing""" + def __init__(self, load_balancer_id=None): + self.lbid = load_balancer_id - def get(self, load_balancer_id): + @wsme_pecan.wsexpose(None) + def get(self): """Retrieve the health monitor configuration, if one exists. - - :param load_balancer_id: id of lb - Url: GET /loadbalancers/{load_balancer_id}/healthmonitor Returns: dict """ - response.status = 201 - return None - def post(self, load_balancer_id, *args): + if not self.lbid: + raise ClientSideError('Load Balancer ID has not been supplied') + + tenant_id = get_limited_to_project(request.headers) + with db_session() as session: + # grab the lb + lb, monitor = session.query(LoadBalancer, HealthMonitor).\ + outerjoin(LoadBalancer.monitors).\ + filter(LoadBalancer.id == self.lbid).\ + filter(LoadBalancer.tenantid == tenant_id).\ + filter(LoadBalancer.status != 'DELETED').first() + + if lb is None: + session.rollback() + raise NotFound('Load Balancer ID is not valid') + + if monitor is None: + session.rollback() + return {} + + monitor_data = { + 'type': monitor.type, + 'delay': monitor.delay, + 'timeout': monitor.timeout, + 'attemptsBeforeDeactivation': monitor.attempts + } + + if monitor.path: + monitor_data['path'] = monitor.path + + return monitor_data + + @wsme_pecan.wsexpose(LBMonitorResp, body=LBMonitorPut, status_code=202) + def put(self, body=None): """Update the settings for a health monitor. :param load_balancer_id: id of lb @@ -44,10 +84,119 @@ class HealthMonitorController(RestController): Returns: dict """ - response.status = 201 - return None + if not self.lbid: + raise ClientSideError('Load Balancer ID has not been supplied') - def delete(self, load_balancer_id): + tenant_id = get_limited_to_project(request.headers) + with db_session() as session: + # grab the lb + lb, monitor = session.query(LoadBalancer, HealthMonitor).\ + outerjoin(LoadBalancer.monitors).\ + filter(LoadBalancer.id == self.lbid).\ + filter(LoadBalancer.tenantid == tenant_id).\ + filter(LoadBalancer.status != 'DELETED').first() + + if lb is None: + session.rollback() + raise NotFound('Load Balancer ID is not valid') + + # Check inputs + if ( + body.type == Unset or + body.delay == Unset or + body.timeout == Unset or + body.attemptsBeforeDeactivation == Unset + ): + session.rollback() + raise ClientSideError( + "Missing field(s): {0}, {1}, {2}, and {3} are required" + .format("type", "delay", "timeout", + "attemptsBeforeDeactivation") + ) + + data = { + "lbid": self.lbid, + "type": body.type, + "delay": int(body.delay), + "timeout": int(body.timeout), + "attempts": int(body.attemptsBeforeDeactivation) + } + + # Path only required when type is not CONNECT + if body.path != Unset and body.path is not None: + if body.type == "CONNECT": + session.rollback() + raise ClientSideError( + "Path argument is invalid with CONNECT type" + ) + data["path"] = body.path + # If path is empty, set to / + if len(data["path"]) == 0 or data["path"][0] != "/": + session.rollback() + raise ClientSideError( + "Path must begin with leading /" + ) + else: + if body.type != "CONNECT": + session.rollback() + raise ClientSideError( + "Path argument is required" + ) + data["path"] = None + + if data["timeout"] > data["delay"]: + session.rollback() + raise ClientSideError( + "timeout cannot be greater than delay" + ) + + if (data["attempts"] < 1 or data["attempts"] > 10): + session.rollback() + raise ClientSideError( + "attemptsBeforeDeactivation must be between 1 and 10" + ) + + if monitor is None: + # This is ok for LBs that already existed without + # monitoring. Create a new entry. + monitor = HealthMonitor( + lbid=self.lbid, type=data["type"], delay=data["delay"], + timeout=data["timeout"], attempts=data["attempts"], + path=data["path"] + ) + session.add(monitor) + else: + # Modify the existing entry. + monitor.type = data["type"] + monitor.delay = data["delay"] + monitor.timeout = data["timeout"] + monitor.attempts = data["attempts"] + monitor.path = data["path"] + + lb.status = 'PENDING_UPDATE' + device = session.query( + Device.id, Device.name + ).join(LoadBalancer.devices).\ + filter(LoadBalancer.id == self.lbid).\ + first() + + return_data = LBMonitorResp() + return_data.type = data["type"] + return_data.delay = str(data["delay"]) + return_data.timeout = str(data["timeout"]) + return_data.attemptsBeforeDeactivation =\ + str(data["attempts"]) + if ((data["path"] is not None) and (len(data["path"]) > 0)): + return_data.path = data["path"] + + session.commit() + submit_job( + 'UPDATE', device.name, device.id, lb.id + ) + return return_data + + @wsme_pecan.wsexpose(None, status_code=202) + def delete(self): """Remove the health monitor. :param load_balancer_id: id of lb @@ -57,4 +206,34 @@ class HealthMonitorController(RestController): Returns: void """ - response.status = 201 + + if not self.lbid: + raise ClientSideError('Load Balancer ID has not been supplied') + + tenant_id = get_limited_to_project(request.headers) + with db_session() as session: + load_balancer, monitor = session.query( + LoadBalancer, HealthMonitor + ).outerjoin(LoadBalancer.monitors).\ + filter(LoadBalancer.tenantid == tenant_id).\ + filter(LoadBalancer.id == self.lbid).\ + filter(LoadBalancer.status != 'DELETED').\ + first() + if load_balancer is None: + session.rollback() + raise NotFound("Load Balancer not found") + + if monitor is not None: + session.delete(monitor) + session.flush() + + device = session.query( + Device.id, Device.name + ).join(LoadBalancer.devices).\ + filter(LoadBalancer.id == self.lbid).\ + first() + session.commit() + submit_job( + 'UPDATE', device.name, device.id, self.lbid + ) + return None diff --git a/libra/api/controllers/load_balancers.py b/libra/api/controllers/load_balancers.py index b0635f83..10097961 100644 --- a/libra/api/controllers/load_balancers.py +++ b/libra/api/controllers/load_balancers.py @@ -21,7 +21,9 @@ from wsme import Unset # other controllers from nodes import NodesController from virtualips import VipsController +from health_monitor import HealthMonitorController from logs import LogsController + # models from libra.api.model.lbaas import LoadBalancer, Device, Node, db_session from libra.api.model.lbaas import loadbalancers_devices, Limits @@ -319,7 +321,6 @@ class LoadBalancersController(RestController): # now save the loadbalancer_id to the device and switch its status # to online device.status = "ONLINE" - session.flush() return_data = LBResp() @@ -463,6 +464,8 @@ class LoadBalancersController(RestController): return VipsController(lbid), remainder[1:] if remainder[0] == 'logs': return LogsController(lbid), remainder[1:] + if remainder[0] == 'healthmonitor': + return HealthMonitorController(lbid), remainder[1:] # Kludgy fix for PUT since WSME doesn't like IDs on the path elif lbid: diff --git a/libra/api/library/gearman_client.py b/libra/api/library/gearman_client.py index 3eebc805..ec436460 100644 --- a/libra/api/library/gearman_client.py +++ b/libra/api/library/gearman_client.py @@ -17,6 +17,7 @@ eventlet.monkey_patch() import logging from libra.common.json_gearman import JSONGearmanClient from libra.api.model.lbaas import LoadBalancer, db_session, Device, Node +from libra.api.model.lbaas import HealthMonitor from libra.api.model.lbaas import loadbalancers_devices from sqlalchemy.exc import OperationalError from pecan import conf @@ -153,6 +154,8 @@ class GearmanClientThread(object): )) session.query(Node).\ filter(Node.lbid == lb.id).delete() + session.query(HealthMonitor).\ + filter(HealthMonitor.lbid == lb.id).delete() session.commit() def _set_error(self, device_id, errmsg, session): @@ -218,7 +221,8 @@ class GearmanClientThread(object): 'protocol': lb.protocol, 'algorithm': lb.algorithm, 'port': lb.port, - 'nodes': [] + 'nodes': [], + 'monitor': {} } for node in lb.nodes: if not node.enabled: @@ -230,7 +234,33 @@ class GearmanClientThread(object): 'condition': condition } lb_data['nodes'].append(node_data) + + # Add a default health monitor if one does not exist + monitor = session.query(HealthMonitor).\ + filter(HealthMonitor.lbid == lb.id).first() + + if monitor is None: + # Set it to a default configuration + monitor = HealthMonitor( + lbid=lb.id, type="CONNECT", delay=30, + timeout=30, attempts=2, path=None + ) + session.add(monitor) + session.flush() + + monitor_data = { + 'type': monitor.type, + 'delay': monitor.delay, + 'timeout': monitor.timeout, + 'attempts': monitor.attempts + } + if monitor.path is not None: + monitor_data['path'] = monitor.path + + lb_data['monitor'] = monitor_data job_data['loadBalancers'].append(lb_data) + + # Update the worker status, response = self._send_message(job_data) lb = session.query(LoadBalancer).\ filter(LoadBalancer.id == self.lbid).\ diff --git a/libra/api/model/lbaas.py b/libra/api/model/lbaas.py index cb4732a3..7aaaad65 100644 --- a/libra/api/model/lbaas.py +++ b/libra/api/model/lbaas.py @@ -87,6 +87,11 @@ class LoadBalancer(DeclarativeBase): nodes = relationship( 'Node', backref=backref('loadbalancers', order_by='Node.id') ) + monitors = relationship( + 'HealthMonitor', backref=backref( + 'loadbalancers', + order_by='HealthMonitor.lbid') + ) devices = relationship( 'Device', secondary=loadbalancers_devices, backref='loadbalancers', lazy='joined' @@ -108,6 +113,23 @@ class Node(DeclarativeBase): weight = Column(u'weight', INTEGER(), nullable=False) +class HealthMonitor(DeclarativeBase): + """monitors model""" + __tablename__ = 'monitors' + #column definitions + lbid = Column( + u'lbid', BIGINT(), ForeignKey('loadbalancers.id'), primary_key=True, + nullable=False + ) + type = Column(u'type', VARCHAR(length=128), nullable=False) + delay = Column(u'delay', INTEGER(), nullable=False) + timeout = Column(u'timeout', INTEGER(), nullable=False) + attempts = Column( + u'attemptsBeforeDeactivation', INTEGER(), nullable=False + ) + path = Column(u'path', VARCHAR(length=2000)) + + class RoutingSession(Session): """ If an engine is already in use, re-use it. Otherwise we can end up with deadlocks in Galera, see http://tinyurl.com/9h6qlly diff --git a/libra/api/model/lbaas.sql b/libra/api/model/lbaas.sql index d8cbc078..7ea3167c 100644 --- a/libra/api/model/lbaas.sql +++ b/libra/api/model/lbaas.sql @@ -64,3 +64,15 @@ CREATE TABLE `loadbalancers_devices` ( `device` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=latin1 + +CREATE TABLE monitors ( + lbid BIGINT NOT NULL, # Loadbalancer who owns this node + type VARCHAR(128) NOT NULL, # Type of ping. CONNECT, HTTP, HTTPS + delay INT NOT NULL, # This is the minimum time in seconds between regular calls to a monitor + timeout INT NOT NULL, # Maximum number of seconds to wait for a connection to the node before it times out. + attemptsBeforeDeactivation INT NOT NULL, # Number of permissible failures before removing a node from rotation. 1 to 10. + path VARCHAR(2000) NULL, # The HTTP path used in the request by the monitor. Begins with / + PRIMARY KEY (lbid) # ids are unique accross all Nodes + ) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci; + + diff --git a/libra/api/model/validators.py b/libra/api/model/validators.py index fd90cc8f..7416f27a 100644 --- a/libra/api/model/validators.py +++ b/libra/api/model/validators.py @@ -93,3 +93,19 @@ class LBResp(Base): updated = wtypes.text virtualIps = wsattr(['LBVipResp']) nodes = wsattr(['LBRespNode']) + + +class LBMonitorPut(Base): + type = Enum(wtypes.text, 'CONNECT', 'HTTP') + delay = int + timeout = int + attemptsBeforeDeactivation = int + path = wtypes.text + + +class LBMonitorResp(Base): + type = wtypes.text + delay = wtypes.text + timeout = wtypes.text + attemptsBeforeDeactivation = wtypes.text + path = wtypes.text