[ADMIN_API]: Added OFFLINE checker and bug fixes
There is now a periodic checker that will test the connectivity to all HAProxy workers. If the per-minute connectivity test fails more than the limit number of times (10 by default), the nova device will be deleted and a new one allocated. This submit also stops allowing API updates to loadbalancers, nodes, monitors, and logs for devices that are in the ERROR state. Change-Id: I64e0029a33f4ca02917b2993b7dd9937a92a428b
This commit is contained in:
@@ -110,6 +110,15 @@ Command Line Options
|
|||||||
How long to wait until we consider the second and final ping check
|
How long to wait until we consider the second and final ping check
|
||||||
failed. Default is 30 seconds.
|
failed. Default is 30 seconds.
|
||||||
|
|
||||||
|
.. option:: --stats_offline_ping_limit <COUNT>
|
||||||
|
|
||||||
|
How many times to ping an OFFLINE load balancer before considering
|
||||||
|
it unreachable and marking it for deletion.
|
||||||
|
|
||||||
|
.. option:: --stats_device_error_limit <COUNT>
|
||||||
|
|
||||||
|
Maximum number of simultaneous device failures to allow recovery on
|
||||||
|
|
||||||
.. option:: --number_of_servers <NUMBER_OF_SERVER>
|
.. option:: --number_of_servers <NUMBER_OF_SERVER>
|
||||||
|
|
||||||
The number of Admin API servers in the system.
|
The number of Admin API servers in the system.
|
||||||
|
@@ -180,6 +180,14 @@ def main():
|
|||||||
'--stats_poll_timeout_retry', type=int, default=30,
|
'--stats_poll_timeout_retry', type=int, default=30,
|
||||||
help='gearman timeout value for retry ping request (in seconds)'
|
help='gearman timeout value for retry ping request (in seconds)'
|
||||||
)
|
)
|
||||||
|
options.parser.add_argument(
|
||||||
|
'--stats_offline_ping_limit', type=int, default=10,
|
||||||
|
help='Number of failed pings to an OFFLINE device before deleting it'
|
||||||
|
)
|
||||||
|
options.parser.add_argument(
|
||||||
|
'--stats_device_error_limit', type=int, default=5,
|
||||||
|
help='Max number of simultaneous device failures to allow recovery on'
|
||||||
|
)
|
||||||
options.parser.add_argument(
|
options.parser.add_argument(
|
||||||
'--number_of_servers', type=int, default=1,
|
'--number_of_servers', type=int, default=1,
|
||||||
help='number of Admin API servers, used to calculate which Admin API '
|
help='number of Admin API servers, used to calculate which Admin API '
|
||||||
|
@@ -171,6 +171,7 @@ class DevicesController(RestController):
|
|||||||
device.floatingIpAddr = body.floatingIpAddr
|
device.floatingIpAddr = body.floatingIpAddr
|
||||||
device.az = body.az
|
device.az = body.az
|
||||||
device.type = body.type
|
device.type = body.type
|
||||||
|
device.pingCount = 0
|
||||||
device.status = 'OFFLINE'
|
device.status = 'OFFLINE'
|
||||||
device.created = None
|
device.created = None
|
||||||
|
|
||||||
|
@@ -343,6 +343,7 @@ class GearmanWork(object):
|
|||||||
device.floatingIpAddr = data['addr']
|
device.floatingIpAddr = data['addr']
|
||||||
device.az = data['az']
|
device.az = data['az']
|
||||||
device.type = data['type']
|
device.type = data['type']
|
||||||
|
device.pingCount = 0
|
||||||
device.status = 'OFFLINE'
|
device.status = 'OFFLINE'
|
||||||
device.created = None
|
device.created = None
|
||||||
with db_session() as session:
|
with db_session() as session:
|
||||||
|
@@ -26,7 +26,7 @@ class AlertDriver(object):
|
|||||||
def send_alert(self, message, device_id):
|
def send_alert(self, message, device_id):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def send_repair(self, message, device_id):
|
def send_delete(self, message, device_id):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def send_node_change(self, message, lbid, degraded):
|
def send_node_change(self, message, lbid, degraded):
|
||||||
|
@@ -21,28 +21,12 @@ from libra.admin_api.stats.drivers.base import AlertDriver
|
|||||||
|
|
||||||
class DbDriver(AlertDriver):
|
class DbDriver(AlertDriver):
|
||||||
def send_alert(self, message, device_id):
|
def send_alert(self, message, device_id):
|
||||||
self.update_status(message, device_id, 'ERROR')
|
|
||||||
|
|
||||||
def send_repair(self, message, device_id):
|
|
||||||
self.update_status(message, device_id, 'ONLINE')
|
|
||||||
|
|
||||||
def update_status(self, message, device_id, status):
|
|
||||||
with db_session() as session:
|
with db_session() as session:
|
||||||
device = session.query(Device).\
|
device = session.query(Device).\
|
||||||
filter(Device.id == device_id).first()
|
filter(Device.id == device_id).first()
|
||||||
|
|
||||||
device.status = status
|
device.status = "ERROR"
|
||||||
|
|
||||||
if status == 'ONLINE':
|
|
||||||
errmsg = "Load Balancer has recovered"
|
|
||||||
lb_status = 'ACTIVE'
|
|
||||||
elif status == 'ERROR':
|
|
||||||
errmsg = "Load Balancer has failed, attempting rebuild"
|
errmsg = "Load Balancer has failed, attempting rebuild"
|
||||||
lb_status = status
|
|
||||||
else:
|
|
||||||
# This shouldnt happen
|
|
||||||
errmsg = ""
|
|
||||||
lb_status = status
|
|
||||||
|
|
||||||
lbs = session.query(
|
lbs = session.query(
|
||||||
loadbalancers_devices.c.loadbalancer).\
|
loadbalancers_devices.c.loadbalancer).\
|
||||||
@@ -52,7 +36,7 @@ class DbDriver(AlertDriver):
|
|||||||
for lb in lbs:
|
for lb in lbs:
|
||||||
session.query(LoadBalancer).\
|
session.query(LoadBalancer).\
|
||||||
filter(LoadBalancer.id == lb[0]).\
|
filter(LoadBalancer.id == lb[0]).\
|
||||||
update({"status": lb_status, "errmsg": errmsg},
|
update({"status": "ERROR", "errmsg": errmsg},
|
||||||
synchronize_session='fetch')
|
synchronize_session='fetch')
|
||||||
|
|
||||||
session.flush()
|
session.flush()
|
||||||
@@ -60,8 +44,14 @@ class DbDriver(AlertDriver):
|
|||||||
session.commit()
|
session.commit()
|
||||||
self._rebuild_device(device_id)
|
self._rebuild_device(device_id)
|
||||||
|
|
||||||
def send_node_change(self, message, lbid, degraded):
|
def send_delete(self, message, device_id):
|
||||||
|
with db_session() as session:
|
||||||
|
session.query(Device).\
|
||||||
|
filter(Device.id == device_id).\
|
||||||
|
update({"status": "DELETED"}, synchronize_session='fetch')
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
def send_node_change(self, message, lbid, degraded):
|
||||||
with db_session() as session:
|
with db_session() as session:
|
||||||
lb = session.query(LoadBalancer).\
|
lb = session.query(LoadBalancer).\
|
||||||
filter(LoadBalancer.id == lbid).first()
|
filter(LoadBalancer.id == lbid).first()
|
||||||
|
@@ -32,9 +32,10 @@ class DatadogDriver(AlertDriver):
|
|||||||
)
|
)
|
||||||
self.logger.info('Datadog alert response: {0}'.format(resp))
|
self.logger.info('Datadog alert response: {0}'.format(resp))
|
||||||
|
|
||||||
def send_repair(self, message, device_id):
|
def send_delete(self, message, device_id):
|
||||||
title = 'Load balancer recovered in {0}'.format(self.args.datadog_env)
|
title = 'Load balancer unreachable in {0}'.\
|
||||||
text = 'Load balancer recovered with message {0} {1}'.format(
|
format(self.args.datadog_env)
|
||||||
|
text = 'Load balancer unreachable with message {0} {1}'.format(
|
||||||
message, self.args.datadog_message_tail
|
message, self.args.datadog_message_tail
|
||||||
)
|
)
|
||||||
tags = self.args.datadog_tags.split()
|
tags = self.args.datadog_tags.split()
|
||||||
|
@@ -18,8 +18,8 @@ class DummyDriver(AlertDriver):
|
|||||||
def send_alert(self, message, device_id):
|
def send_alert(self, message, device_id):
|
||||||
self.logger.info('Dummy alert of: {0}'.format(message))
|
self.logger.info('Dummy alert of: {0}'.format(message))
|
||||||
|
|
||||||
def send_repair(self, message, device_id):
|
def send_delete(self, message, device_id):
|
||||||
self.logger.info('Dummy repair of: {0}'.format(message))
|
self.logger.info('Dummy delete of: {0}'.format(message))
|
||||||
|
|
||||||
def send_node_change(self, message, lbid, degraded):
|
def send_node_change(self, message, lbid, degraded):
|
||||||
self.logger.info('Dummy node change of: {0}'.format(message))
|
self.logger.info('Dummy node change of: {0}'.format(message))
|
||||||
|
@@ -25,44 +25,47 @@ class NodeNotFound(Exception):
|
|||||||
class Stats(object):
|
class Stats(object):
|
||||||
|
|
||||||
PING_SECONDS = 15
|
PING_SECONDS = 15
|
||||||
REPAIR_SECONDS = 45
|
OFFLINE_SECONDS = 45
|
||||||
|
|
||||||
def __init__(self, logger, args, drivers):
|
def __init__(self, logger, args, drivers):
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
self.args = args
|
self.args = args
|
||||||
self.drivers = drivers
|
self.drivers = drivers
|
||||||
self.ping_timer = None
|
self.ping_timer = None
|
||||||
self.repair_timer = None
|
self.offline_timer = None
|
||||||
|
self.ping_limit = args.stats_offline_ping_limit
|
||||||
|
self.error_limit = args.stats_device_error_limit
|
||||||
|
|
||||||
logger.info("Selected stats drivers: {0}".format(args.stats_driver))
|
logger.info("Selected stats drivers: {0}".format(args.stats_driver))
|
||||||
|
|
||||||
self.start_ping_sched()
|
self.start_ping_sched()
|
||||||
# TODO: completely remove repaid sched, rebuild instead
|
self.start_offline_sched()
|
||||||
#self.start_repair_sched()
|
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
if self.ping_timer:
|
if self.ping_timer:
|
||||||
self.ping_timer.cancel()
|
self.ping_timer.cancel()
|
||||||
if self.repair_timer:
|
if self.offline_timer:
|
||||||
self.repair_timer.cancel()
|
self.offline_timer.cancel()
|
||||||
|
|
||||||
def repair_lbs(self):
|
def check_offline_lbs(self):
|
||||||
# Work out if it is our turn to run
|
# Work out if it is our turn to run
|
||||||
minute = datetime.now().minute
|
minute = datetime.now().minute
|
||||||
if self.args.server_id != minute % self.args.number_of_servers:
|
if self.args.server_id != minute % self.args.number_of_servers:
|
||||||
self.logger.info('Not our turn to run repair check, sleeping')
|
self.logger.info('Not our turn to run OFFLINE check, sleeping')
|
||||||
self.start_repair_sched()
|
self.start_offline_sched()
|
||||||
return
|
return
|
||||||
tested = 0
|
tested = 0
|
||||||
repaired = 0
|
failed = 0
|
||||||
try:
|
try:
|
||||||
tested, repaired = self._exec_repair()
|
tested, failed = self._exec_offline_check()
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception('Uncaught exception during LB repair')
|
self.logger.exception('Uncaught exception during OFFLINE check')
|
||||||
# Need to restart timer after every ping cycle
|
# Need to restart timer after every ping cycle
|
||||||
self.logger.info('{tested} loadbalancers tested, {repaired} repaired'
|
self.logger.info(
|
||||||
.format(tested=tested, repaired=repaired))
|
'{tested} OFFLINE loadbalancers tested, {failed} failed'
|
||||||
self.start_repair_sched()
|
.format(tested=tested, failed=failed)
|
||||||
|
)
|
||||||
|
self.start_offline_sched()
|
||||||
|
|
||||||
def ping_lbs(self):
|
def ping_lbs(self):
|
||||||
# Work out if it is our turn to run
|
# Work out if it is our turn to run
|
||||||
@@ -100,48 +103,65 @@ class Stats(object):
|
|||||||
gearman = GearJobs(self.logger, self.args)
|
gearman = GearJobs(self.logger, self.args)
|
||||||
failed_lbs, node_status = gearman.send_pings(node_list)
|
failed_lbs, node_status = gearman.send_pings(node_list)
|
||||||
failed = len(failed_lbs)
|
failed = len(failed_lbs)
|
||||||
# TODO: if failed over a threshold (5?) error instead of rebuild,
|
if failed > self.error_limit:
|
||||||
# something bad probably happened
|
self.logger.error(
|
||||||
|
'Too many simultaneous Load Balancer Failures.'
|
||||||
|
' Aborting recovery attempt'
|
||||||
|
)
|
||||||
|
return (0, 0)
|
||||||
|
|
||||||
if failed > 0:
|
if failed > 0:
|
||||||
self._send_fails(failed_lbs, session)
|
self._send_fails(failed_lbs)
|
||||||
session.commit()
|
|
||||||
|
|
||||||
# Process node status after lb status
|
# Process node status after lb status
|
||||||
self._update_nodes(node_status, session)
|
self._update_nodes(node_status)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
return pings, failed
|
return pings, failed
|
||||||
|
|
||||||
def _exec_repair(self):
|
def _exec_offline_check(self):
|
||||||
tested = 0
|
tested = 0
|
||||||
repaired = 0
|
failed = 0
|
||||||
node_list = []
|
node_list = []
|
||||||
self.logger.info('Running repair check')
|
self.logger.info('Running OFFLINE check')
|
||||||
with db_session() as session:
|
with db_session() as session:
|
||||||
# Join to ensure device is in-use
|
# Join to ensure device is in-use
|
||||||
devices = session.query(
|
devices = session.query(
|
||||||
Device.id, Device.name
|
Device.id, Device.name
|
||||||
).join(LoadBalancer.devices).\
|
).filter(Device.status == 'OFFLINE').all()
|
||||||
filter(Device.status == 'ERROR').all()
|
|
||||||
|
|
||||||
tested = len(devices)
|
tested = len(devices)
|
||||||
if tested == 0:
|
if tested == 0:
|
||||||
self.logger.info('No LBs need repair')
|
self.logger.info('No OFFLINE Load Balancers to check')
|
||||||
return (0, 0)
|
return (0, 0)
|
||||||
for lb in devices:
|
for lb in devices:
|
||||||
node_list.append(lb.name)
|
node_list.append(lb.name)
|
||||||
gearman = GearJobs(self.logger, self.args)
|
gearman = GearJobs(self.logger, self.args)
|
||||||
repaired_lbs, node_status = gearman.send_repair(node_list)
|
failed_lbs = gearman.offline_check(node_list)
|
||||||
repaired = len(repaired_lbs)
|
failed = len(failed_lbs)
|
||||||
if repaired > 0:
|
if failed > self.error_limit:
|
||||||
self._send_repair(repaired_lbs, session)
|
self.logger.error(
|
||||||
|
'Too many simultaneous Load Balancer Failures.'
|
||||||
|
' Aborting deletion attempt'
|
||||||
|
)
|
||||||
|
return (0, 0)
|
||||||
|
|
||||||
|
if failed > 0:
|
||||||
|
self._send_delete(failed_lbs)
|
||||||
|
|
||||||
|
# Clear the ping counts for all devices not in
|
||||||
|
# the failed list
|
||||||
|
succeeded = list(set(node_list) - set(failed_lbs))
|
||||||
|
session.query(Device.name, Device.pingCount).\
|
||||||
|
filter(Device.name.in_(succeeded)).\
|
||||||
|
update({"pingCount": 0}, synchronize_session='fetch')
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
# Process node status after lb status
|
return tested, failed
|
||||||
self._update_nodes(node_status, session)
|
|
||||||
|
|
||||||
return tested, repaired
|
def _send_fails(self, failed_lbs):
|
||||||
|
with db_session() as session:
|
||||||
def _send_fails(self, failed_lbs, session):
|
|
||||||
for lb in failed_lbs:
|
for lb in failed_lbs:
|
||||||
data = self._get_lb(lb, session)
|
data = self._get_lb(lb, session)
|
||||||
if not data:
|
if not data:
|
||||||
@@ -167,27 +187,48 @@ class Stats(object):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
instance.send_alert(message, data.id)
|
instance.send_alert(message, data.id)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
def _send_repair(self, repaired_nodes, session):
|
def _send_delete(self, failed_nodes):
|
||||||
for lb in repaired_nodes:
|
with db_session() as session:
|
||||||
data = self._get_lb(lb, session)
|
for lb in failed_nodes:
|
||||||
message = (
|
# Get the current ping count
|
||||||
'Load balancer repaired\n'
|
data = session.query(
|
||||||
'ID: {0}\n'
|
Device.id, Device.pingCount).\
|
||||||
'IP: {1}\n'
|
filter(Device.name == lb).first()
|
||||||
'tenant: {2}\n'.format(
|
|
||||||
data.id, data.floatingIpAddr,
|
if not data:
|
||||||
data.tenantid
|
self.logger.error(
|
||||||
|
'Device {0} no longer exists'.format(data.id)
|
||||||
)
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if data.pingCount < self.ping_limit:
|
||||||
|
data.pingCount += 1
|
||||||
|
self.logger.error(
|
||||||
|
'Offline Device {0} has failed {1} ping attempts'.
|
||||||
|
format(lb, data.pingCount)
|
||||||
|
)
|
||||||
|
session.query(Device).\
|
||||||
|
filter(Device.name == lb).\
|
||||||
|
update({"pingCount": data.pingCount},
|
||||||
|
synchronize_session='fetch')
|
||||||
|
session.flush()
|
||||||
|
continue
|
||||||
|
|
||||||
|
message = (
|
||||||
|
'Load balancer {0} unreachable and marked for deletion'.
|
||||||
|
format(lb)
|
||||||
)
|
)
|
||||||
for driver in self.drivers:
|
for driver in self.drivers:
|
||||||
instance = driver(self.logger, self.args)
|
instance = driver(self.logger, self.args)
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'Sending repair of {0} to {1}'.format(
|
'Sending delete request for {0} to {1}'.format(
|
||||||
lb, instance.__class__.__name__
|
lb, instance.__class__.__name__
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
instance.send_repair(message, data.id)
|
instance.send_delete(message, data.id)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
def _get_lb(self, lb, session):
|
def _get_lb(self, lb, session):
|
||||||
lb = session.query(
|
lb = session.query(
|
||||||
@@ -197,12 +238,13 @@ class Stats(object):
|
|||||||
|
|
||||||
return lb
|
return lb
|
||||||
|
|
||||||
def _update_nodes(self, node_status, session):
|
def _update_nodes(self, node_status):
|
||||||
lbids = []
|
lbids = []
|
||||||
degraded = []
|
degraded = []
|
||||||
failed_nodes = dict()
|
failed_nodes = dict()
|
||||||
repaired_nodes = dict()
|
repaired_nodes = dict()
|
||||||
errormsg = dict()
|
errormsg = dict()
|
||||||
|
with db_session() as session:
|
||||||
for lb, nodes in node_status.iteritems():
|
for lb, nodes in node_status.iteritems():
|
||||||
data = self._get_lb(lb, session)
|
data = self._get_lb(lb, session)
|
||||||
if not data:
|
if not data:
|
||||||
@@ -226,12 +268,14 @@ class Stats(object):
|
|||||||
|
|
||||||
new_status = None
|
new_status = None
|
||||||
# Compare node status to the workers status
|
# Compare node status to the workers status
|
||||||
if (node['status'] == 'DOWN' and node_data.status == 'ONLINE'):
|
if (node['status'] == 'DOWN' and
|
||||||
|
node_data.status == 'ONLINE'):
|
||||||
new_status = 'ERROR'
|
new_status = 'ERROR'
|
||||||
if node_data.lbid not in failed_nodes:
|
if node_data.lbid not in failed_nodes:
|
||||||
failed_nodes[node_data.lbid] = []
|
failed_nodes[node_data.lbid] = []
|
||||||
failed_nodes[node_data.lbid].append(node['id'])
|
failed_nodes[node_data.lbid].append(node['id'])
|
||||||
elif (node['status'] == 'UP' and node_data.status == 'ERROR'):
|
elif (node['status'] == 'UP' and
|
||||||
|
node_data.status == 'ERROR'):
|
||||||
new_status = 'ONLINE'
|
new_status = 'ONLINE'
|
||||||
if node_data.lbid not in repaired_nodes:
|
if node_data.lbid not in repaired_nodes:
|
||||||
repaired_nodes[node_data.lbid] = []
|
repaired_nodes[node_data.lbid] = []
|
||||||
@@ -244,9 +288,12 @@ class Stats(object):
|
|||||||
if node_data.lbid not in lbids:
|
if node_data.lbid not in lbids:
|
||||||
lbids.append(node_data.lbid)
|
lbids.append(node_data.lbid)
|
||||||
errormsg[node_data.lbid] =\
|
errormsg[node_data.lbid] =\
|
||||||
'Node status change ID: {0}, IP: {1}, tenant: {2}'.\
|
'Node status change ID:'\
|
||||||
|
' {0}, IP: {1}, tenant: {2}'.\
|
||||||
format(
|
format(
|
||||||
node_data.lbid, data.floatingIpAddr, data.tenantid
|
node_data.lbid,
|
||||||
|
data.floatingIpAddr,
|
||||||
|
data.tenantid
|
||||||
)
|
)
|
||||||
|
|
||||||
# Change the node status in the node table
|
# Change the node status in the node table
|
||||||
@@ -255,7 +302,6 @@ class Stats(object):
|
|||||||
update({"status": new_status},
|
update({"status": new_status},
|
||||||
synchronize_session='fetch')
|
synchronize_session='fetch')
|
||||||
session.flush()
|
session.flush()
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
# Generate a status message per LB for the alert.
|
# Generate a status message per LB for the alert.
|
||||||
@@ -300,15 +346,17 @@ class Stats(object):
|
|||||||
self.ping_timer = threading.Timer(sleeptime, self.ping_lbs, ())
|
self.ping_timer = threading.Timer(sleeptime, self.ping_lbs, ())
|
||||||
self.ping_timer.start()
|
self.ping_timer.start()
|
||||||
|
|
||||||
def start_repair_sched(self):
|
def start_offline_sched(self):
|
||||||
# Always try to hit the expected second mark for repairs
|
# Always try to hit the expected second mark for offline checks
|
||||||
seconds = datetime.now().second
|
seconds = datetime.now().second
|
||||||
if seconds < self.REPAIR_SECONDS:
|
if seconds < self.OFFLINE_SECONDS:
|
||||||
sleeptime = self.REPAIR_SECONDS - seconds
|
sleeptime = self.OFFLINE_SECONDS - seconds
|
||||||
else:
|
else:
|
||||||
sleeptime = 60 - (seconds - self.REPAIR_SECONDS)
|
sleeptime = 60 - (seconds - self.OFFLINE_SECONDS)
|
||||||
|
|
||||||
self.logger.info('LB repair check timer sleeping for {secs} seconds'
|
self.logger.info('LB offline check timer sleeping for {secs} seconds'
|
||||||
.format(secs=sleeptime))
|
.format(secs=sleeptime))
|
||||||
self.repair_timer = threading.Timer(sleeptime, self.repair_lbs, ())
|
self.offline_timer = threading.Timer(
|
||||||
self.repair_timer.start()
|
sleeptime, self.check_offline_lbs, ()
|
||||||
|
)
|
||||||
|
self.offline_timer.start()
|
||||||
|
@@ -107,10 +107,9 @@ class GearJobs(object):
|
|||||||
|
|
||||||
return failed_list, node_status
|
return failed_list, node_status
|
||||||
|
|
||||||
def send_repair(self, node_list):
|
def offline_check(self, node_list):
|
||||||
list_of_jobs = []
|
list_of_jobs = []
|
||||||
repaired_list = []
|
failed_list = []
|
||||||
node_status = dict()
|
|
||||||
job_data = {"hpcs_action": "STATS"}
|
job_data = {"hpcs_action": "STATS"}
|
||||||
for node in node_list:
|
for node in node_list:
|
||||||
list_of_jobs.append(dict(task=str(node), data=job_data))
|
list_of_jobs.append(dict(task=str(node), data=job_data))
|
||||||
@@ -120,18 +119,11 @@ class GearJobs(object):
|
|||||||
)
|
)
|
||||||
for ping in submitted_pings:
|
for ping in submitted_pings:
|
||||||
if ping.state == JOB_UNKNOWN:
|
if ping.state == JOB_UNKNOWN:
|
||||||
# TODO: Gearman server failed, ignoring for now
|
self.logger.error(
|
||||||
self.logger.error('Gearman Job server fail')
|
"Gearman Job server failed during OFFLINE check of {0}".
|
||||||
continue
|
format(ping.job.task)
|
||||||
|
)
|
||||||
elif ping.timed_out:
|
elif ping.timed_out:
|
||||||
# Ping timeout
|
failed_list.append(ping.job.task)
|
||||||
continue
|
|
||||||
elif ping.result['hpcs_response'] == 'FAIL':
|
|
||||||
# Error returned by Gearman
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
repaired_list.append(ping.job.task)
|
|
||||||
if 'nodes' in ping.result:
|
|
||||||
node_status[ping.job.task] = ping.result['nodes']
|
|
||||||
|
|
||||||
return repaired_list, node_status
|
return failed_list
|
||||||
|
@@ -182,11 +182,17 @@ class HealthMonitorController(RestController):
|
|||||||
|
|
||||||
lb.status = 'PENDING_UPDATE'
|
lb.status = 'PENDING_UPDATE'
|
||||||
device = session.query(
|
device = session.query(
|
||||||
Device.id, Device.name
|
Device.id, Device.name, Device.status
|
||||||
).join(LoadBalancer.devices).\
|
).join(LoadBalancer.devices).\
|
||||||
filter(LoadBalancer.id == self.lbid).\
|
filter(LoadBalancer.id == self.lbid).\
|
||||||
first()
|
first()
|
||||||
|
|
||||||
|
if device.status == 'ERROR':
|
||||||
|
session.rollback()
|
||||||
|
raise ClientSideError(
|
||||||
|
'Cannot modify a Load Balancer in an ERROR state'
|
||||||
|
)
|
||||||
|
|
||||||
return_data = LBMonitorResp()
|
return_data = LBMonitorResp()
|
||||||
return_data.type = data["type"]
|
return_data.type = data["type"]
|
||||||
return_data.delay = str(data["delay"])
|
return_data.delay = str(data["delay"])
|
||||||
|
@@ -292,6 +292,14 @@ class LoadBalancersController(RestController):
|
|||||||
).join(Device.vip).\
|
).join(Device.vip).\
|
||||||
filter(Vip.id == virtual_id).\
|
filter(Vip.id == virtual_id).\
|
||||||
first()
|
first()
|
||||||
|
|
||||||
|
if device.status == 'ERROR':
|
||||||
|
session.rollback()
|
||||||
|
raise ClientSideError(
|
||||||
|
'Cannot add a Load Balancer to a device'
|
||||||
|
' in an ERROR state'
|
||||||
|
)
|
||||||
|
|
||||||
old_lb = session.query(
|
old_lb = session.query(
|
||||||
LoadBalancer
|
LoadBalancer
|
||||||
).join(LoadBalancer.devices).\
|
).join(LoadBalancer.devices).\
|
||||||
@@ -409,10 +417,17 @@ class LoadBalancersController(RestController):
|
|||||||
|
|
||||||
lb.status = 'PENDING_UPDATE'
|
lb.status = 'PENDING_UPDATE'
|
||||||
device = session.query(
|
device = session.query(
|
||||||
Device.id, Device.name
|
Device.id, Device.name, Device.status
|
||||||
).join(LoadBalancer.devices).\
|
).join(LoadBalancer.devices).\
|
||||||
filter(LoadBalancer.id == self.lbid).\
|
filter(LoadBalancer.id == self.lbid).\
|
||||||
first()
|
first()
|
||||||
|
|
||||||
|
if device.status == 'ERROR':
|
||||||
|
session.rollback()
|
||||||
|
raise ClientSideError(
|
||||||
|
'Cannot update a Load Balancer in an ERROR state'
|
||||||
|
)
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
submit_job(
|
submit_job(
|
||||||
'UPDATE', device.name, device.id, lb.id
|
'UPDATE', device.name, device.id, lb.id
|
||||||
|
@@ -48,10 +48,17 @@ class LogsController(RestController):
|
|||||||
|
|
||||||
load_balancer.status = 'PENDING_UPDATE'
|
load_balancer.status = 'PENDING_UPDATE'
|
||||||
device = session.query(
|
device = session.query(
|
||||||
Device.id, Device.name
|
Device.id, Device.name, Device.status
|
||||||
).join(LoadBalancer.devices).\
|
).join(LoadBalancer.devices).\
|
||||||
filter(LoadBalancer.id == self.lbid).\
|
filter(LoadBalancer.id == self.lbid).\
|
||||||
first()
|
first()
|
||||||
|
|
||||||
|
if device.status == 'ERROR':
|
||||||
|
session.rollback()
|
||||||
|
raise ClientSideError(
|
||||||
|
'Load Balancer is currently in an ERROR state'
|
||||||
|
)
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
data = {
|
data = {
|
||||||
'deviceid': device.id
|
'deviceid': device.id
|
||||||
|
@@ -195,10 +195,17 @@ class NodesController(RestController):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
device = session.query(
|
device = session.query(
|
||||||
Device.id, Device.name
|
Device.id, Device.name, Device.status
|
||||||
).join(LoadBalancer.devices).\
|
).join(LoadBalancer.devices).\
|
||||||
filter(LoadBalancer.id == self.lbid).\
|
filter(LoadBalancer.id == self.lbid).\
|
||||||
first()
|
first()
|
||||||
|
|
||||||
|
if device.status == 'ERROR':
|
||||||
|
session.rollback()
|
||||||
|
raise ClientSideError(
|
||||||
|
'Cannot modify a Load Balancer in an ERROR state'
|
||||||
|
)
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
submit_job(
|
submit_job(
|
||||||
'UPDATE', device.name, device.id, self.lbid
|
'UPDATE', device.name, device.id, self.lbid
|
||||||
@@ -250,10 +257,17 @@ class NodesController(RestController):
|
|||||||
|
|
||||||
lb.status = 'PENDING_UPDATE'
|
lb.status = 'PENDING_UPDATE'
|
||||||
device = session.query(
|
device = session.query(
|
||||||
Device.id, Device.name
|
Device.id, Device.name, Device.status
|
||||||
).join(LoadBalancer.devices).\
|
).join(LoadBalancer.devices).\
|
||||||
filter(LoadBalancer.id == self.lbid).\
|
filter(LoadBalancer.id == self.lbid).\
|
||||||
first()
|
first()
|
||||||
|
|
||||||
|
if device.status == 'ERROR':
|
||||||
|
session.rollback()
|
||||||
|
raise ClientSideError(
|
||||||
|
'Cannot modify a Load Balancer in an ERROR state'
|
||||||
|
)
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
submit_job(
|
submit_job(
|
||||||
'UPDATE', device.name, device.id, lb.id
|
'UPDATE', device.name, device.id, lb.id
|
||||||
|
@@ -79,6 +79,7 @@ class Device(DeclarativeBase):
|
|||||||
publicIpAddr = Column(u'publicIpAddr', VARCHAR(length=128), nullable=False)
|
publicIpAddr = Column(u'publicIpAddr', VARCHAR(length=128), nullable=False)
|
||||||
status = Column(u'status', VARCHAR(length=128), nullable=False)
|
status = Column(u'status', VARCHAR(length=128), nullable=False)
|
||||||
type = Column(u'type', VARCHAR(length=128), nullable=False)
|
type = Column(u'type', VARCHAR(length=128), nullable=False)
|
||||||
|
pingCount = Column(u'pingCount', INTEGER(), nullable=False)
|
||||||
updated = Column(u'updated', FormatedDateTime(), nullable=False)
|
updated = Column(u'updated', FormatedDateTime(), nullable=False)
|
||||||
vip = relationship("Vip", uselist=False, backref="devices")
|
vip = relationship("Vip", uselist=False, backref="devices")
|
||||||
|
|
||||||
|
@@ -52,6 +52,7 @@ CREATE TABLE devices (
|
|||||||
type VARCHAR(128) NOT NULL, # text description of type of device, e.g. 'HAProxy'
|
type VARCHAR(128) NOT NULL, # text description of type of device, e.g. 'HAProxy'
|
||||||
created TIMESTAMP NOT NULL DEFAULT '0000-00-00 00:00:00', # timestamp of when device was created (default sets to current timestamp on row create)
|
created TIMESTAMP NOT NULL DEFAULT '0000-00-00 00:00:00', # timestamp of when device was created (default sets to current timestamp on row create)
|
||||||
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, # timestamp of when device was last updated
|
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, # timestamp of when device was last updated
|
||||||
|
pingCount INT NOT NULL, # Number of ping failures against an OFFLINE device
|
||||||
status VARCHAR(128) NOT NULL, # status of device 'OFFLINE', 'ONLINE', 'ERROR', this value is reported by the device
|
status VARCHAR(128) NOT NULL, # status of device 'OFFLINE', 'ONLINE', 'ERROR', this value is reported by the device
|
||||||
PRIMARY KEY (id)
|
PRIMARY KEY (id)
|
||||||
) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci;
|
) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci;
|
||||||
|
Reference in New Issue
Block a user