Merge "[ADMIN_API]: Added OFFLINE checker and bug fixes"

This commit is contained in:
Jenkins
2013-09-18 18:48:09 +00:00
committed by Gerrit Code Review
16 changed files with 276 additions and 182 deletions

View File

@@ -110,6 +110,15 @@ Command Line Options
How long to wait until we consider the second and final ping check
failed. Default is 30 seconds.
.. option:: --stats_offline_ping_limit <COUNT>
How many times to ping an OFFLINE load balancer before considering
it unreachable and marking it for deletion.
.. option:: --stats_device_error_limit <COUNT>
Maximum number of simultaneous device failures to allow recovery on
.. option:: --number_of_servers <NUMBER_OF_SERVER>
The number of Admin API servers in the system.

View File

@@ -181,6 +181,14 @@ def main():
'--stats_poll_timeout_retry', type=int, default=30,
help='gearman timeout value for retry ping request (in seconds)'
)
options.parser.add_argument(
'--stats_offline_ping_limit', type=int, default=10,
help='Number of failed pings to an OFFLINE device before deleting it'
)
options.parser.add_argument(
'--stats_device_error_limit', type=int, default=5,
help='Max number of simultaneous device failures to allow recovery on'
)
options.parser.add_argument(
'--number_of_servers', type=int, default=1,
help='number of Admin API servers, used to calculate which Admin API '

View File

@@ -171,6 +171,7 @@ class DevicesController(RestController):
device.floatingIpAddr = body.floatingIpAddr
device.az = body.az
device.type = body.type
device.pingCount = 0
device.status = 'OFFLINE'
device.created = None

View File

@@ -343,6 +343,7 @@ class GearmanWork(object):
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'OFFLINE'
device.created = None
with db_session() as session:

View File

@@ -26,7 +26,7 @@ class AlertDriver(object):
def send_alert(self, message, device_id):
raise NotImplementedError()
def send_repair(self, message, device_id):
def send_delete(self, message, device_id):
raise NotImplementedError()
def send_node_change(self, message, lbid, degraded):

View File

@@ -21,28 +21,12 @@ from libra.admin_api.stats.drivers.base import AlertDriver
class DbDriver(AlertDriver):
def send_alert(self, message, device_id):
self.update_status(message, device_id, 'ERROR')
def send_repair(self, message, device_id):
self.update_status(message, device_id, 'ONLINE')
def update_status(self, message, device_id, status):
with db_session() as session:
device = session.query(Device).\
filter(Device.id == device_id).first()
device.status = status
if status == 'ONLINE':
errmsg = "Load Balancer has recovered"
lb_status = 'ACTIVE'
elif status == 'ERROR':
errmsg = "Load Balancer has failed, attempting rebuild"
lb_status = status
else:
# This shouldnt happen
errmsg = ""
lb_status = status
device.status = "ERROR"
errmsg = "Load Balancer has failed, attempting rebuild"
lbs = session.query(
loadbalancers_devices.c.loadbalancer).\
@@ -52,7 +36,7 @@ class DbDriver(AlertDriver):
for lb in lbs:
session.query(LoadBalancer).\
filter(LoadBalancer.id == lb[0]).\
update({"status": lb_status, "errmsg": errmsg},
update({"status": "ERROR", "errmsg": errmsg},
synchronize_session='fetch')
session.flush()
@@ -60,8 +44,14 @@ class DbDriver(AlertDriver):
session.commit()
self._rebuild_device(device_id)
def send_node_change(self, message, lbid, degraded):
def send_delete(self, message, device_id):
with db_session() as session:
session.query(Device).\
filter(Device.id == device_id).\
update({"status": "DELETED"}, synchronize_session='fetch')
session.commit()
def send_node_change(self, message, lbid, degraded):
with db_session() as session:
lb = session.query(LoadBalancer).\
filter(LoadBalancer.id == lbid).first()

View File

@@ -32,9 +32,10 @@ class DatadogDriver(AlertDriver):
)
self.logger.info('Datadog alert response: {0}'.format(resp))
def send_repair(self, message, device_id):
title = 'Load balancer recovered in {0}'.format(self.args.datadog_env)
text = 'Load balancer recovered with message {0} {1}'.format(
def send_delete(self, message, device_id):
title = 'Load balancer unreachable in {0}'.\
format(self.args.datadog_env)
text = 'Load balancer unreachable with message {0} {1}'.format(
message, self.args.datadog_message_tail
)
tags = self.args.datadog_tags.split()

View File

@@ -18,8 +18,8 @@ class DummyDriver(AlertDriver):
def send_alert(self, message, device_id):
self.logger.info('Dummy alert of: {0}'.format(message))
def send_repair(self, message, device_id):
self.logger.info('Dummy repair of: {0}'.format(message))
def send_delete(self, message, device_id):
self.logger.info('Dummy delete of: {0}'.format(message))
def send_node_change(self, message, lbid, degraded):
self.logger.info('Dummy node change of: {0}'.format(message))

View File

@@ -25,44 +25,47 @@ class NodeNotFound(Exception):
class Stats(object):
PING_SECONDS = 15
REPAIR_SECONDS = 45
OFFLINE_SECONDS = 45
def __init__(self, logger, args, drivers):
self.logger = logger
self.args = args
self.drivers = drivers
self.ping_timer = None
self.repair_timer = None
self.offline_timer = None
self.ping_limit = args.stats_offline_ping_limit
self.error_limit = args.stats_device_error_limit
logger.info("Selected stats drivers: {0}".format(args.stats_driver))
self.start_ping_sched()
# TODO: completely remove repaid sched, rebuild instead
#self.start_repair_sched()
self.start_offline_sched()
def shutdown(self):
if self.ping_timer:
self.ping_timer.cancel()
if self.repair_timer:
self.repair_timer.cancel()
if self.offline_timer:
self.offline_timer.cancel()
def repair_lbs(self):
def check_offline_lbs(self):
# Work out if it is our turn to run
minute = datetime.now().minute
if self.args.server_id != minute % self.args.number_of_servers:
self.logger.info('Not our turn to run repair check, sleeping')
self.start_repair_sched()
self.logger.info('Not our turn to run OFFLINE check, sleeping')
self.start_offline_sched()
return
tested = 0
repaired = 0
failed = 0
try:
tested, repaired = self._exec_repair()
tested, failed = self._exec_offline_check()
except Exception:
self.logger.exception('Uncaught exception during LB repair')
self.logger.exception('Uncaught exception during OFFLINE check')
# Need to restart timer after every ping cycle
self.logger.info('{tested} loadbalancers tested, {repaired} repaired'
.format(tested=tested, repaired=repaired))
self.start_repair_sched()
self.logger.info(
'{tested} OFFLINE loadbalancers tested, {failed} failed'
.format(tested=tested, failed=failed)
)
self.start_offline_sched()
def ping_lbs(self):
# Work out if it is our turn to run
@@ -100,94 +103,132 @@ class Stats(object):
gearman = GearJobs(self.logger, self.args)
failed_lbs, node_status = gearman.send_pings(node_list)
failed = len(failed_lbs)
# TODO: if failed over a threshold (5?) error instead of rebuild,
# something bad probably happened
if failed > self.error_limit:
self.logger.error(
'Too many simultaneous Load Balancer Failures.'
' Aborting recovery attempt'
)
return (0, 0)
if failed > 0:
self._send_fails(failed_lbs, session)
session.commit()
self._send_fails(failed_lbs)
# Process node status after lb status
self._update_nodes(node_status, session)
self._update_nodes(node_status)
session.commit()
return pings, failed
def _exec_repair(self):
def _exec_offline_check(self):
tested = 0
repaired = 0
failed = 0
node_list = []
self.logger.info('Running repair check')
self.logger.info('Running OFFLINE check')
with db_session() as session:
# Join to ensure device is in-use
devices = session.query(
Device.id, Device.name
).join(LoadBalancer.devices).\
filter(Device.status == 'ERROR').all()
).filter(Device.status == 'OFFLINE').all()
tested = len(devices)
if tested == 0:
self.logger.info('No LBs need repair')
self.logger.info('No OFFLINE Load Balancers to check')
return (0, 0)
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs(self.logger, self.args)
repaired_lbs, node_status = gearman.send_repair(node_list)
repaired = len(repaired_lbs)
if repaired > 0:
self._send_repair(repaired_lbs, session)
failed_lbs = gearman.offline_check(node_list)
failed = len(failed_lbs)
if failed > self.error_limit:
self.logger.error(
'Too many simultaneous Load Balancer Failures.'
' Aborting deletion attempt'
)
return (0, 0)
if failed > 0:
self._send_delete(failed_lbs)
# Clear the ping counts for all devices not in
# the failed list
succeeded = list(set(node_list) - set(failed_lbs))
session.query(Device.name, Device.pingCount).\
filter(Device.name.in_(succeeded)).\
update({"pingCount": 0}, synchronize_session='fetch')
session.commit()
# Process node status after lb status
self._update_nodes(node_status, session)
return tested, failed
return tested, repaired
def _send_fails(self, failed_lbs, session):
for lb in failed_lbs:
data = self._get_lb(lb, session)
if not data:
self.logger.error(
'Device {0} has no Loadbalancer attached'.
format(data.id)
)
continue
message = (
'Load balancer failed\n'
'ID: {0}\n'
'IP: {1}\n'
'tenant: {2}\n'.format(
data.id, data.floatingIpAddr,
data.tenantid
)
)
for driver in self.drivers:
instance = driver(self.logger, self.args)
self.logger.info(
'Sending failure of {0} to {1}'.format(
lb, instance.__class__.__name__
def _send_fails(self, failed_lbs):
with db_session() as session:
for lb in failed_lbs:
data = self._get_lb(lb, session)
if not data:
self.logger.error(
'Device {0} has no Loadbalancer attached'.
format(data.id)
)
continue
message = (
'Load balancer failed\n'
'ID: {0}\n'
'IP: {1}\n'
'tenant: {2}\n'.format(
data.id, data.floatingIpAddr,
data.tenantid
)
)
instance.send_alert(message, data.id)
def _send_repair(self, repaired_nodes, session):
for lb in repaired_nodes:
data = self._get_lb(lb, session)
message = (
'Load balancer repaired\n'
'ID: {0}\n'
'IP: {1}\n'
'tenant: {2}\n'.format(
data.id, data.floatingIpAddr,
data.tenantid
)
)
for driver in self.drivers:
instance = driver(self.logger, self.args)
self.logger.info(
'Sending repair of {0} to {1}'.format(
lb, instance.__class__.__name__
for driver in self.drivers:
instance = driver(self.logger, self.args)
self.logger.info(
'Sending failure of {0} to {1}'.format(
lb, instance.__class__.__name__
)
)
instance.send_alert(message, data.id)
session.commit()
def _send_delete(self, failed_nodes):
with db_session() as session:
for lb in failed_nodes:
# Get the current ping count
data = session.query(
Device.id, Device.pingCount).\
filter(Device.name == lb).first()
if not data:
self.logger.error(
'Device {0} no longer exists'.format(data.id)
)
continue
if data.pingCount < self.ping_limit:
data.pingCount += 1
self.logger.error(
'Offline Device {0} has failed {1} ping attempts'.
format(lb, data.pingCount)
)
session.query(Device).\
filter(Device.name == lb).\
update({"pingCount": data.pingCount},
synchronize_session='fetch')
session.flush()
continue
message = (
'Load balancer {0} unreachable and marked for deletion'.
format(lb)
)
instance.send_repair(message, data.id)
for driver in self.drivers:
instance = driver(self.logger, self.args)
self.logger.info(
'Sending delete request for {0} to {1}'.format(
lb, instance.__class__.__name__
)
)
instance.send_delete(message, data.id)
session.commit()
def _get_lb(self, lb, session):
lb = session.query(
@@ -197,65 +238,70 @@ class Stats(object):
return lb
def _update_nodes(self, node_status, session):
def _update_nodes(self, node_status):
lbids = []
degraded = []
failed_nodes = dict()
repaired_nodes = dict()
errormsg = dict()
for lb, nodes in node_status.iteritems():
data = self._get_lb(lb, session)
if not data:
self.logger.error(
'Device {0} has no Loadbalancer attached'.
format(lb)
)
continue
# Iterate the list of nodes returned from the worker
# and track any status changes
for node in nodes:
# Get the last known status from the nodes table
node_data = session.query(Node).\
filter(Node.id == node['id']).first()
# Note all degraded LBs
if (node['status'] == 'DOWN' and
node_data.lbid not in degraded):
degraded.append(node_data.lbid)
new_status = None
# Compare node status to the workers status
if (node['status'] == 'DOWN' and node_data.status == 'ONLINE'):
new_status = 'ERROR'
if node_data.lbid not in failed_nodes:
failed_nodes[node_data.lbid] = []
failed_nodes[node_data.lbid].append(node['id'])
elif (node['status'] == 'UP' and node_data.status == 'ERROR'):
new_status = 'ONLINE'
if node_data.lbid not in repaired_nodes:
repaired_nodes[node_data.lbid] = []
repaired_nodes[node_data.lbid].append(node['id'])
else:
# No change
with db_session() as session:
for lb, nodes in node_status.iteritems():
data = self._get_lb(lb, session)
if not data:
self.logger.error(
'Device {0} has no Loadbalancer attached'.
format(lb)
)
continue
# Note all LBs with node status changes
if node_data.lbid not in lbids:
lbids.append(node_data.lbid)
errormsg[node_data.lbid] =\
'Node status change ID: {0}, IP: {1}, tenant: {2}'.\
format(
node_data.lbid, data.floatingIpAddr, data.tenantid
)
# Iterate the list of nodes returned from the worker
# and track any status changes
for node in nodes:
# Get the last known status from the nodes table
node_data = session.query(Node).\
filter(Node.id == node['id']).first()
# Change the node status in the node table
session.query(Node).\
filter(Node.id == node['id']).\
update({"status": new_status},
synchronize_session='fetch')
session.flush()
# Note all degraded LBs
if (node['status'] == 'DOWN' and
node_data.lbid not in degraded):
degraded.append(node_data.lbid)
new_status = None
# Compare node status to the workers status
if (node['status'] == 'DOWN' and
node_data.status == 'ONLINE'):
new_status = 'ERROR'
if node_data.lbid not in failed_nodes:
failed_nodes[node_data.lbid] = []
failed_nodes[node_data.lbid].append(node['id'])
elif (node['status'] == 'UP' and
node_data.status == 'ERROR'):
new_status = 'ONLINE'
if node_data.lbid not in repaired_nodes:
repaired_nodes[node_data.lbid] = []
repaired_nodes[node_data.lbid].append(node['id'])
else:
# No change
continue
# Note all LBs with node status changes
if node_data.lbid not in lbids:
lbids.append(node_data.lbid)
errormsg[node_data.lbid] =\
'Node status change ID:'\
' {0}, IP: {1}, tenant: {2}'.\
format(
node_data.lbid,
data.floatingIpAddr,
data.tenantid
)
# Change the node status in the node table
session.query(Node).\
filter(Node.id == node['id']).\
update({"status": new_status},
synchronize_session='fetch')
session.flush()
session.commit()
# Generate a status message per LB for the alert.
@@ -300,15 +346,17 @@ class Stats(object):
self.ping_timer = threading.Timer(sleeptime, self.ping_lbs, ())
self.ping_timer.start()
def start_repair_sched(self):
# Always try to hit the expected second mark for repairs
def start_offline_sched(self):
# Always try to hit the expected second mark for offline checks
seconds = datetime.now().second
if seconds < self.REPAIR_SECONDS:
sleeptime = self.REPAIR_SECONDS - seconds
if seconds < self.OFFLINE_SECONDS:
sleeptime = self.OFFLINE_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.REPAIR_SECONDS)
sleeptime = 60 - (seconds - self.OFFLINE_SECONDS)
self.logger.info('LB repair check timer sleeping for {secs} seconds'
self.logger.info('LB offline check timer sleeping for {secs} seconds'
.format(secs=sleeptime))
self.repair_timer = threading.Timer(sleeptime, self.repair_lbs, ())
self.repair_timer.start()
self.offline_timer = threading.Timer(
sleeptime, self.check_offline_lbs, ()
)
self.offline_timer.start()

View File

@@ -107,10 +107,9 @@ class GearJobs(object):
return failed_list, node_status
def send_repair(self, node_list):
def offline_check(self, node_list):
list_of_jobs = []
repaired_list = []
node_status = dict()
failed_list = []
job_data = {"hpcs_action": "STATS"}
for node in node_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
@@ -120,18 +119,11 @@ class GearJobs(object):
)
for ping in submitted_pings:
if ping.state == JOB_UNKNOWN:
# TODO: Gearman server failed, ignoring for now
self.logger.error('Gearman Job server fail')
continue
self.logger.error(
"Gearman Job server failed during OFFLINE check of {0}".
format(ping.job.task)
)
elif ping.timed_out:
# Ping timeout
continue
elif ping.result['hpcs_response'] == 'FAIL':
# Error returned by Gearman
continue
else:
repaired_list.append(ping.job.task)
if 'nodes' in ping.result:
node_status[ping.job.task] = ping.result['nodes']
failed_list.append(ping.job.task)
return repaired_list, node_status
return failed_list

View File

@@ -182,11 +182,17 @@ class HealthMonitorController(RestController):
lb.status = 'PENDING_UPDATE'
device = session.query(
Device.id, Device.name
Device.id, Device.name, Device.status
).join(LoadBalancer.devices).\
filter(LoadBalancer.id == self.lbid).\
first()
if device.status == 'ERROR':
session.rollback()
raise ClientSideError(
'Cannot modify a Load Balancer in an ERROR state'
)
return_data = LBMonitorResp()
return_data.type = data["type"]
return_data.delay = str(data["delay"])

View File

@@ -292,6 +292,14 @@ class LoadBalancersController(RestController):
).join(Device.vip).\
filter(Vip.id == virtual_id).\
first()
if device.status == 'ERROR':
session.rollback()
raise ClientSideError(
'Cannot add a Load Balancer to a device'
' in an ERROR state'
)
old_lb = session.query(
LoadBalancer
).join(LoadBalancer.devices).\
@@ -409,10 +417,17 @@ class LoadBalancersController(RestController):
lb.status = 'PENDING_UPDATE'
device = session.query(
Device.id, Device.name
Device.id, Device.name, Device.status
).join(LoadBalancer.devices).\
filter(LoadBalancer.id == self.lbid).\
first()
if device.status == 'ERROR':
session.rollback()
raise ClientSideError(
'Cannot update a Load Balancer in an ERROR state'
)
session.commit()
submit_job(
'UPDATE', device.name, device.id, lb.id

View File

@@ -48,10 +48,17 @@ class LogsController(RestController):
load_balancer.status = 'PENDING_UPDATE'
device = session.query(
Device.id, Device.name
Device.id, Device.name, Device.status
).join(LoadBalancer.devices).\
filter(LoadBalancer.id == self.lbid).\
first()
if device.status == 'ERROR':
session.rollback()
raise ClientSideError(
'Load Balancer is currently in an ERROR state'
)
session.commit()
data = {
'deviceid': device.id

View File

@@ -195,10 +195,17 @@ class NodesController(RestController):
)
)
device = session.query(
Device.id, Device.name
Device.id, Device.name, Device.status
).join(LoadBalancer.devices).\
filter(LoadBalancer.id == self.lbid).\
first()
if device.status == 'ERROR':
session.rollback()
raise ClientSideError(
'Cannot modify a Load Balancer in an ERROR state'
)
session.commit()
submit_job(
'UPDATE', device.name, device.id, self.lbid
@@ -250,10 +257,17 @@ class NodesController(RestController):
lb.status = 'PENDING_UPDATE'
device = session.query(
Device.id, Device.name
Device.id, Device.name, Device.status
).join(LoadBalancer.devices).\
filter(LoadBalancer.id == self.lbid).\
first()
if device.status == 'ERROR':
session.rollback()
raise ClientSideError(
'Cannot modify a Load Balancer in an ERROR state'
)
session.commit()
submit_job(
'UPDATE', device.name, device.id, lb.id

View File

@@ -79,6 +79,7 @@ class Device(DeclarativeBase):
publicIpAddr = Column(u'publicIpAddr', VARCHAR(length=128), nullable=False)
status = Column(u'status', VARCHAR(length=128), nullable=False)
type = Column(u'type', VARCHAR(length=128), nullable=False)
pingCount = Column(u'pingCount', INTEGER(), nullable=False)
updated = Column(u'updated', FormatedDateTime(), nullable=False)
vip = relationship("Vip", uselist=False, backref="devices")

View File

@@ -52,6 +52,7 @@ CREATE TABLE devices (
type VARCHAR(128) NOT NULL, # text description of type of device, e.g. 'HAProxy'
created TIMESTAMP NOT NULL DEFAULT '0000-00-00 00:00:00', # timestamp of when device was created (default sets to current timestamp on row create)
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, # timestamp of when device was last updated
pingCount INT NOT NULL, # Number of ping failures against an OFFLINE device
status VARCHAR(128) NOT NULL, # status of device 'OFFLINE', 'ONLINE', 'ERROR', this value is reported by the device
PRIMARY KEY (id)
) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci;