[ADMIN_API] passive health monitoring changes

Added support to statsd for monitoring nodes going
UP and DOWN as reported by the worker / haproxy.

Change-Id: Id3787343d5b019e409a8b841b582588b6c913058
This commit is contained in:
marcrp
2013-07-31 18:05:54 -04:00
committed by Andrew Hutchings
parent 4e6280a730
commit 5a65fdc4d0
9 changed files with 176 additions and 39 deletions

View File

@@ -123,10 +123,6 @@ def main():
choices=known_drivers.keys(), default='dummy',
help='type of stats device to use'
)
options.parser.add_argument(
'--stats_ping_timer', type=int, default=60,
help='how often to ping load balancers (in seconds)'
)
options.parser.add_argument(
'--stats_poll_timeout', type=int, default=5,
help='gearman timeout value for initial ping request (in seconds)'
@@ -135,11 +131,6 @@ def main():
'--stats_poll_timeout_retry', type=int, default=30,
help='gearman timeout value for retry ping request (in seconds)'
)
options.parser.add_argument(
'--stats_repair_timer', type=int, default=180,
help='how often to check if a load balancer has been repaired (in '
'seconds)'
)
options.parser.add_argument(
'--number_of_servers', type=int, default=1,
help='number of Admin API servers, used to calculate which Admin API '

View File

@@ -28,3 +28,6 @@ class AlertDriver(object):
def send_repair(self, message, device_id):
raise NotImplementedError()
def send_node_change(self, message, lbid, degraded):
raise NotImplementedError()

View File

@@ -46,3 +46,21 @@ class DbDriver(AlertDriver):
session.flush()
session.commit()
def send_node_change(self, message, lbid, degraded):
with db_session() as session:
lb = session.query(LoadBalancer).\
filter(LoadBalancer.id == lbid).first()
if lb.status == 'ERROR':
lb_status = lb.status
else:
lb_status = 'DEGRADED' if degraded else 'ACTIVE'
session.query(LoadBalancer).\
filter(LoadBalancer.id == lbid).\
update({"status": lb_status, "errmsg": message},
synchronize_session='fetch')
session.commit()

View File

@@ -18,5 +18,8 @@ class DummyDriver(AlertDriver):
def send_alert(self, message, device_id):
self.logger.info('Dummy alert of: {0}'.format(message))
def send_repair(self, message, dervice_id):
def send_repair(self, message, device_id):
self.logger.info('Dummy repair of: {0}'.format(message))
def send_node_change(self, message, lbid, degraded):
self.logger.info('Dummy node change of: {0}'.format(message))

View File

@@ -16,7 +16,7 @@ import threading
import signal
import sys
from datetime import datetime
from libra.admin_api.model.lbaas import LoadBalancer, Device, db_session
from libra.admin_api.model.lbaas import LoadBalancer, Device, Node, db_session
from libra.admin_api.stats.stats_gearman import GearJobs
@@ -25,6 +25,10 @@ class NodeNotFound(Exception):
class Stats(object):
PING_SECONDS = 15
REPAIR_SECONDS = 45
def __init__(self, logger, args, drivers):
self.logger = logger
self.args = args
@@ -36,8 +40,8 @@ class Stats(object):
signal.signal(signal.SIGTERM, self.exit_handler)
logger.info("Selected stats drivers: {0}".format(args.stats_driver))
self.ping_lbs()
self.repair_lbs()
self.start_ping_sched()
self.start_repair_sched()
def exit_handler(self, signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -109,11 +113,15 @@ class Stats(object):
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs(self.logger, self.args)
failed_nodes = gearman.send_pings(node_list)
failed = len(failed_nodes)
failed_lbs, node_status = gearman.send_pings(node_list)
failed = len(failed_lbs)
if failed > 0:
self._send_fails(failed_nodes, session)
self._send_fails(failed_lbs, session)
session.commit()
# Process node status after lb status
self._update_nodes(node_status, session)
return pings, failed
def _exec_repair(self):
@@ -133,16 +141,20 @@ class Stats(object):
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs(self.logger, self.args)
repaired_nodes = gearman.send_repair(node_list)
repaired = len(repaired_nodes)
repaired_lbs, node_status = gearman.send_repair(node_list)
repaired = len(repaired_lbs)
if repaired > 0:
self._send_repair(repaired_nodes, session)
self._send_repair(repaired_lbs, session)
session.commit()
# Process node status after lb status
self._update_nodes(node_status, session)
return tested, repaired
def _send_fails(self, failed_nodes, session):
for node in failed_nodes:
data = self._get_node(node, session)
def _send_fails(self, failed_lbs, session):
for lb in failed_lbs:
data = self._get_lb(lb, session)
if not data:
self.logger.error(
'Device {0} has no Loadbalancer attached'.
@@ -162,14 +174,14 @@ class Stats(object):
instance = driver(self.logger, self.args)
self.logger.info(
'Sending failure of {0} to {1}'.format(
node, instance.__class__.__name__
lb, instance.__class__.__name__
)
)
instance.send_alert(message, data.id)
def _send_repair(self, repaired_nodes, session):
for node in repaired_nodes:
data = self._get_node(node, session)
for lb in repaired_nodes:
data = self._get_lb(lb, session)
message = (
'Load balancer repaired\n'
'ID: {0}\n'
@@ -183,29 +195,120 @@ class Stats(object):
instance = driver(self.logger, self.args)
self.logger.info(
'Sending repair of {0} to {1}'.format(
node, instance.__class__.__name__
lb, instance.__class__.__name__
)
)
instance.send_repair(message, data.id)
def _get_node(self, node, session):
def _get_lb(self, lb, session):
lb = session.query(
LoadBalancer.tenantid, Device.floatingIpAddr, Device.id
).join(LoadBalancer.devices).\
filter(Device.name == node).first()
filter(Device.name == lb).first()
return lb
def _update_nodes(self, node_status, session):
for lb, nodes in node_status.iteritems():
data = self._get_lb(lb, session)
if not data:
self.logger.error(
'Device {0} has no Loadbalancer attached'.
format(lb)
)
continue
# Iterate the list of nodes returned from the worker
# and track any status changes
lbids = []
degraded = []
failed_nodes = dict()
repaired_nodes = dict()
for node in nodes:
# Get the last known status from the nodes table
node_data = session.query(Node).\
filter(Node.id == node['id']).first()
# Note all degraded LBs
if (node['status'] == 'DOWN' and
node.data.lbid not in degraded):
degraded.append(node_data.lbid)
# Compare node status to the workers status
if (node['status'] == 'DOWN' and node_data.status == 'ONLINE'):
failed_nodes[node_data.lbid].append(node['id'])
elif (node['status'] == 'UP' and node_data.status == 'ERROR'):
repaired_nodes[node_data.lbid].append(node['id'])
else:
# No change
continue
# Note all LBs with node status changes
if node.data.lbid not in lbids:
lbids.append(node_data.lbid)
# Change the node status in the node table
session.query(Node).\
filter(Node.id == node['id']).\
update({"status": node['status']},
synchronize_session='fetch')
session.flush()
session.commit()
# Generate a status message per LB for the alert.
for lbid in lbids:
message = 'Node status change\n\
ID: {1}\n\
IP: {2}\n\
tenant: {3}:\n'.format(
lbid, data.floatingIpAddr, data.tenantid)
if lbid in failed_nodes:
message += ' failed:'
message += ','.join(str(x) for x in failed_nodes[lbid])
message += '\n'
if lbid in repaired_nodes:
message += ' repaired: '
message += ','.join(str(x) for x in repaired_nodes[lbid])
# Send the LB node change alert
for driver in self.drivers:
instance = driver(self.logger, self.args)
self.logger.info(
'Sending failure of nodes on LB {0} to {1}'.format(
node, instance.__class__.__name__)
)
degraded = lbid in degraded
try:
instance.send_node_change(message, lbid, degraded)
except NotImplementedError:
pass
def start_ping_sched(self):
# Always try to hit the expected second mark for pings
seconds = datetime.now().seconds
if seconds < self.PING_SECONDS:
sleeptime = self.PING_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.PING_SECONDS)
self.logger.info('LB ping check timer sleeping for {secs} seconds'
.format(secs=self.args.stats_ping_timer))
self.ping_timer = threading.Timer(self.args.stats_ping_timer,
self.ping_lbs, ())
.format(secs=sleeptime))
self.ping_timer = threading.Timer(sleeptime, self.ping_lbs, ())
self.ping_timer.start()
def start_repair_sched(self):
# Always try to hit the expected second mark for repairs
seconds = datetime.now().seconds
if seconds < self.REPAIR_SECONDS:
sleeptime = self.REPAIR_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.REPAIR_SECONDS)
self.logger.info('LB repair check timer sleeping for {secs} seconds'
.format(secs=self.args.stats_repair_timer))
self.repair_timer = threading.Timer(self.args.stats_repair_timer,
self.repair_lbs, ())
.format(secs=sleeptime))
self.repair_timer = threading.Timer(sleeptime, self.repair_lbs, ())
self.repair_timer.start()

View File

@@ -41,6 +41,7 @@ class GearJobs(object):
# TODO: lots of duplicated code that needs cleanup
list_of_jobs = []
failed_list = []
node_status = dict()
retry_list = []
job_data = {"hpcs_action": "STATS"}
for node in node_list:
@@ -67,6 +68,9 @@ class GearJobs(object):
# Error returned by Gearman
failed_list.append(ping.job.task)
continue
else:
if 'nodes' in ping.result:
node_status[ping.job.task] = ping.result['nodes']
list_of_jobs = []
if len(retry_list) > 0:
@@ -97,12 +101,16 @@ class GearJobs(object):
# Error returned by Gearman
failed_list.append(ping.job.task)
continue
else:
if 'nodes' in ping.result:
node_status[ping.job.task] = ping.result['nodes']
return failed_list
return failed_list, node_status
def send_repair(self, node_list):
list_of_jobs = []
repaired_list = []
node_status = dict()
job_data = {"hpcs_action": "STATS"}
for node in node_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
@@ -123,5 +131,7 @@ class GearJobs(object):
continue
else:
repaired_list.append(ping.job.task)
if 'nodes' in ping.result:
node_status[ping.job.task] = ping.result['nodes']
return repaired_list
return repaired_list, node_status

View File

@@ -308,9 +308,10 @@ class LoadBalancersController(RestController):
enabled = 0
else:
enabled = 1
node_status = 'ONLINE' if enabled else 'OFFLINE'
out_node = Node(
lbid=lb.id, port=node.port, address=node.address,
enabled=enabled, status='ONLINE', weight=1
enabled=enabled, status=node_status, weight=1
)
session.add(out_node)

View File

@@ -170,9 +170,10 @@ class NodesController(RestController):
enabled = 0
else:
enabled = 1
node_status = 'ONLINE' if enabled else 'OFFLINE'
new_node = Node(
lbid=self.lbid, port=node.port, address=node.address,
enabled=enabled, status='ONLINE', weight=1
enabled=enabled, status=node_status, weight=1
)
session.add(new_node)
session.flush()
@@ -184,7 +185,7 @@ class NodesController(RestController):
NodeResp(
id=new_node.id, port=new_node.port,
address=new_node.address, condition=condition,
status='ONLINE'
status=new_node.status
)
)
device = session.query(
@@ -231,8 +232,10 @@ class NodesController(RestController):
if body.condition != Unset:
if body.condition == 'DISABLED':
node.enabled = 0
node.status = 'OFFLINE'
else:
node.enabled = 1
node.status = 'ONLINE'
if nodecount <= 1:
session.rollback()
raise ClientSideError(

View File

@@ -96,5 +96,10 @@ class HAProxyQuery(object):
junk, node_id = elements[1].split('-')
else:
node_id = elements[1]
# All the way up is UP, otherwise call it DOWN
if elements[17] != "UP":
elements[17] = "DOWN"
final_results.append((node_id, elements[17]))
return final_results