Merge "[WORKER] Implement new STATS request"
This commit is contained in:
@@ -51,12 +51,11 @@
|
||||
|
||||
#driver = haproxy
|
||||
#pid = /var/run/libra/libra_worker.pid
|
||||
#logfile = /var/log/libra/libra_worker.log
|
||||
|
||||
# HAProxy driver options for the worker
|
||||
[worker:haproxy]
|
||||
#service = ubuntu
|
||||
#logfile = /var/log/haproxy.log
|
||||
#statsfile = /var/log/haproxy.stats
|
||||
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
@@ -67,7 +66,6 @@
|
||||
|
||||
# Options with defaults
|
||||
#pid = /var/run/libra/libra_mgm.pid
|
||||
#logfile = /var/log/libra/libra_mgm.log
|
||||
#threads = 4
|
||||
#rm_fip_ignore_500 = false
|
||||
#nova_insecure = false
|
||||
@@ -101,7 +99,6 @@ nova_tenant_id = TENANTID
|
||||
# Options with defaults
|
||||
#host = 0.0.0.0
|
||||
#port = 8889
|
||||
#logfile = /var/log/libra/libra_admin_api.log
|
||||
#pid = /var/run/libra/libra_admin_api.pid
|
||||
#expire_days = 0
|
||||
#node_pool_size = 10
|
||||
@@ -140,7 +137,6 @@ datadog_tags = service:lbaas
|
||||
#host = 0.0.0.0
|
||||
#port = 443
|
||||
#keystone_module = keystoneclient.middleware.auth_token:AuthProtocol
|
||||
#logfile = /var/log/libra/libra_api.log
|
||||
#pid = /var/run/libra/libra_api.pid
|
||||
|
||||
# Required options
|
||||
|
@@ -1,55 +0,0 @@
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
from libra.tests.base import TestCase
|
||||
from libra.worker.drivers.haproxy.lbstats import LBStatistics
|
||||
|
||||
|
||||
class TestLBStatistics(TestCase):
|
||||
def setUp(self):
|
||||
super(TestLBStatistics, self).setUp()
|
||||
self.stats = LBStatistics()
|
||||
|
||||
def testInitValues(self):
|
||||
now = datetime.datetime.utcnow()
|
||||
ts = self.stats.utc_timestamp
|
||||
self.assertEquals(ts.year, now.year)
|
||||
self.assertEquals(ts.month, now.month)
|
||||
self.assertEquals(ts.day, now.day)
|
||||
self.assertEquals(ts.hour, now.hour)
|
||||
self.assertEquals(self.stats.bytes_out, 0L)
|
||||
self.assertEquals(self.stats.bytes_in, 0L)
|
||||
|
||||
def testSetBytesIn(self):
|
||||
self.stats.bytes_in = 99L
|
||||
self.assertEquals(self.stats.bytes_in, 99L)
|
||||
e = self.assertRaises(TypeError, setattr, self.stats,
|
||||
'bytes_in', "NaN")
|
||||
self.assertEqual("Must be a long integer: 'NaN'", e.message)
|
||||
|
||||
def testSetBytesOut(self):
|
||||
self.stats.bytes_out = 100L
|
||||
self.assertEquals(self.stats.bytes_out, 100L)
|
||||
e = self.assertRaises(TypeError, setattr, self.stats,
|
||||
'bytes_out', "NaN")
|
||||
self.assertEqual("Must be a long integer: 'NaN'", e.message)
|
||||
|
||||
def testSetUTCTimestamp(self):
|
||||
ts = datetime.datetime.utcnow()
|
||||
self.stats.utc_timestamp = ts
|
||||
self.assertEquals(self.stats.utc_timestamp, ts)
|
||||
e = self.assertRaises(TypeError, setattr, self.stats,
|
||||
'utc_timestamp', "NaN")
|
||||
self.assertEqual("Must be a datetime.datetime: 'NaN'", e.message)
|
90
libra/tests/worker/test_stats.py
Normal file
90
libra/tests/worker/test_stats.py
Normal file
@@ -0,0 +1,90 @@
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import os.path
|
||||
import tempfile
|
||||
|
||||
from libra.tests.base import TestCase
|
||||
from libra.worker.drivers.haproxy.stats import StatisticsManager
|
||||
|
||||
|
||||
class TestStatisticsManager(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestStatisticsManager, self).setUp()
|
||||
self.tmpfile = tempfile.gettempdir() + "/tstLibraTestStatsMgr.tmp"
|
||||
self.mgr = StatisticsManager(self.tmpfile)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.tmpfile):
|
||||
os.remove(self.tmpfile)
|
||||
super(TestStatisticsManager, self).tearDown()
|
||||
|
||||
def testReadNoStatsFile(self):
|
||||
self.assertEquals(self.mgr.get_start(), None)
|
||||
self.assertEquals(self.mgr.get_end(), None)
|
||||
self.assertEquals(self.mgr.get_last_tcp_bytes(), 0)
|
||||
self.assertEquals(self.mgr.get_last_http_bytes(), 0)
|
||||
self.assertEquals(self.mgr.get_unreported_tcp_bytes(), 0)
|
||||
self.assertEquals(self.mgr.get_unreported_http_bytes(), 0)
|
||||
|
||||
def testSave(self):
|
||||
start_ts = datetime.datetime(2013, 1, 31, 12, 10, 30, 123456)
|
||||
end_ts = start_ts + datetime.timedelta(minutes=5)
|
||||
tcp_bytes = 1024
|
||||
http_bytes = 2048
|
||||
unreported_tcp_bytes = 3000
|
||||
unreported_http_bytes = 4000
|
||||
|
||||
self.mgr.save(start_ts, end_ts,
|
||||
tcp_bytes=tcp_bytes, http_bytes=http_bytes)
|
||||
self.mgr.read()
|
||||
|
||||
self.assertEquals(self.mgr.get_start(), start_ts)
|
||||
self.assertEquals(self.mgr.get_end(), end_ts)
|
||||
self.assertEquals(self.mgr.get_last_tcp_bytes(), tcp_bytes)
|
||||
self.assertEquals(self.mgr.get_last_http_bytes(), http_bytes)
|
||||
self.assertEquals(self.mgr.get_unreported_tcp_bytes(), 0)
|
||||
self.assertEquals(self.mgr.get_unreported_http_bytes(), 0)
|
||||
|
||||
self.mgr.save(start_ts, end_ts,
|
||||
unreported_tcp_bytes=unreported_tcp_bytes,
|
||||
unreported_http_bytes=unreported_http_bytes)
|
||||
self.mgr.read()
|
||||
|
||||
self.assertEquals(self.mgr.get_start(), start_ts)
|
||||
self.assertEquals(self.mgr.get_end(), end_ts)
|
||||
self.assertEquals(self.mgr.get_last_tcp_bytes(), 0)
|
||||
self.assertEquals(self.mgr.get_last_http_bytes(), 0)
|
||||
self.assertEquals(self.mgr.get_unreported_tcp_bytes(),
|
||||
unreported_tcp_bytes)
|
||||
self.assertEquals(self.mgr.get_unreported_http_bytes(),
|
||||
unreported_http_bytes)
|
||||
|
||||
self.mgr.save(start_ts, end_ts,
|
||||
tcp_bytes=tcp_bytes,
|
||||
http_bytes=http_bytes,
|
||||
unreported_tcp_bytes=unreported_tcp_bytes,
|
||||
unreported_http_bytes=unreported_http_bytes)
|
||||
self.mgr.read()
|
||||
|
||||
self.assertEquals(self.mgr.get_start(), start_ts)
|
||||
self.assertEquals(self.mgr.get_end(), end_ts)
|
||||
self.assertEquals(self.mgr.get_last_tcp_bytes(), tcp_bytes)
|
||||
self.assertEquals(self.mgr.get_last_http_bytes(), http_bytes)
|
||||
self.assertEquals(self.mgr.get_unreported_tcp_bytes(),
|
||||
unreported_tcp_bytes)
|
||||
self.assertEquals(self.mgr.get_unreported_http_bytes(),
|
||||
unreported_http_bytes)
|
@@ -21,7 +21,7 @@ from libra import __release__ as libra_release
|
||||
from libra.common.exc import DeletedStateError
|
||||
from libra.common.faults import BadRequest
|
||||
from libra.openstack.common import log
|
||||
from libra.worker.drivers.base import LoadBalancerDriver
|
||||
from libra.worker.drivers import base
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@@ -71,8 +71,7 @@ class LBaaSController(object):
|
||||
elif action == 'ARCHIVE':
|
||||
return self._action_archive()
|
||||
elif action == 'STATS':
|
||||
# TODO: Implement new STATS function
|
||||
return self._action_ping()
|
||||
return self._action_stats()
|
||||
elif action == 'PING':
|
||||
return self._action_ping()
|
||||
elif action == 'DIAGNOSTICS':
|
||||
@@ -203,15 +202,15 @@ class LBaaSController(object):
|
||||
if 'algorithm' in current_lb:
|
||||
algo = current_lb['algorithm'].upper()
|
||||
if algo == 'ROUND_ROBIN':
|
||||
algo = LoadBalancerDriver.ROUNDROBIN
|
||||
algo = base.LoadBalancerDriver.ROUNDROBIN
|
||||
elif algo == 'LEAST_CONNECTIONS':
|
||||
algo = LoadBalancerDriver.LEASTCONN
|
||||
algo = base.LoadBalancerDriver.LEASTCONN
|
||||
else:
|
||||
LOG.error("Invalid algorithm: %s" % algo)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
else:
|
||||
algo = LoadBalancerDriver.ROUNDROBIN
|
||||
algo = base.LoadBalancerDriver.ROUNDROBIN
|
||||
|
||||
try:
|
||||
self.driver.set_algorithm(current_lb['protocol'], algo)
|
||||
@@ -451,26 +450,69 @@ class LBaaSController(object):
|
||||
"""
|
||||
|
||||
try:
|
||||
stats = self.driver.get_status()
|
||||
nodes = self.driver.get_status()
|
||||
except NotImplementedError:
|
||||
error = "Selected driver does not support PING action."
|
||||
LOG.error(error)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
except DeletedStateError:
|
||||
LOG.info("Invalid operation PING on a deleted LB")
|
||||
error = "Invalid operation PING on a deleted LB."
|
||||
LOG.error(error)
|
||||
self.msg['status'] = 'DELETED'
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
except Exception as e:
|
||||
LOG.error("PING failed: %s, %s" % (e.__class__, e))
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = str(e)
|
||||
else:
|
||||
node_status = stats.node_status_map()
|
||||
self.msg['nodes'] = []
|
||||
for node in node_status.keys():
|
||||
self.msg['nodes'].append({'id': node,
|
||||
'status': node_status[node]})
|
||||
for node, status in nodes:
|
||||
self.msg['nodes'].append({'id': node, 'status': status})
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
|
||||
return self.msg
|
||||
|
||||
def _action_stats(self):
|
||||
"""
|
||||
Get load balancer statistics
|
||||
|
||||
This type of request gets the number of bytes out for each load
|
||||
balancer defined on the device. If both a TCP and HTTP load
|
||||
balancer exist, we report on each in a single response.
|
||||
"""
|
||||
|
||||
try:
|
||||
start, end, statistics = self.driver.get_statistics()
|
||||
except NotImplementedError:
|
||||
error = "Selected driver does not support STATS action."
|
||||
LOG.error(error)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
return self.msg
|
||||
except DeletedStateError:
|
||||
error = "Invalid operation STATS on a deleted LB."
|
||||
LOG.error(error)
|
||||
self.msg['status'] = 'DELETED'
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
LOG.error("STATS failed: %s, %s" % (e.__class__, e))
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = str(e)
|
||||
return self.msg
|
||||
|
||||
self.msg['utc_start'] = start
|
||||
self.msg['utc_end'] = end
|
||||
self.msg['loadBalancers'] = []
|
||||
|
||||
# We should have a list of tuples pairing the number of bytes
|
||||
# out with the protocol/LB.
|
||||
for proto, bytes_out in statistics:
|
||||
self.msg['loadBalancers'].append({'protocol': proto,
|
||||
'bytes_out': bytes_out})
|
||||
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
return self.msg
|
||||
|
@@ -97,7 +97,26 @@ class LoadBalancerDriver(object):
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_status(self, protocol):
|
||||
""" Get load balancer status for specified protocol. """
|
||||
"""
|
||||
Get load balancer status for specified protocol.
|
||||
|
||||
Returns a list of tuples containing (in this order):
|
||||
- node ID
|
||||
- node status
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_statistics(self):
|
||||
"""
|
||||
Get load balancer statistics for all LBs on the device.
|
||||
|
||||
Returns a tuple containing (in this order):
|
||||
- start timestamp for the reporting period as a string
|
||||
- end timestamp for the reporting period as a string
|
||||
- list of tuples containing (in this order):
|
||||
- protocol for the LB ('tcp' or 'http') as a string
|
||||
- bytes out for this LB for this reporting period as an int
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def archive(self, method, params):
|
||||
|
@@ -26,6 +26,9 @@ cfg.CONF.register_opts(
|
||||
cfg.StrOpt('logfile',
|
||||
default='/var/log/haproxy.log',
|
||||
help='Location of HAProxy logfile'),
|
||||
cfg.StrOpt('statsfile',
|
||||
default='/var/log/haproxy.stats',
|
||||
help='Location of the HAProxy statistics cache file'),
|
||||
],
|
||||
group=haproxy_group
|
||||
)
|
||||
|
@@ -396,6 +396,9 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
def get_status(self, protocol=None):
|
||||
return self.ossvc.get_status(protocol)
|
||||
|
||||
def get_statistics(self):
|
||||
return self.ossvc.get_statistics()
|
||||
|
||||
def archive(self, method, params):
|
||||
"""
|
||||
Implementation of the archive() API call.
|
||||
|
@@ -1,64 +0,0 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
|
||||
class LBStatistics(object):
|
||||
""" Load balancer statistics class. """
|
||||
|
||||
def __init__(self):
|
||||
self.stats = {}
|
||||
self.bytes_out = 0L
|
||||
self.bytes_in = 0L
|
||||
self.nodes = dict()
|
||||
self.utc_timestamp = datetime.datetime.utcnow()
|
||||
|
||||
@property
|
||||
def bytes_out(self):
|
||||
return self.stats['bytes_out']
|
||||
|
||||
@bytes_out.setter
|
||||
def bytes_out(self, value):
|
||||
if not isinstance(value, long):
|
||||
raise TypeError("Must be a long integer: '%s'" % value)
|
||||
self.stats['bytes_out'] = value
|
||||
|
||||
@property
|
||||
def bytes_in(self):
|
||||
return self.stats['bytes_in']
|
||||
|
||||
@bytes_in.setter
|
||||
def bytes_in(self, value):
|
||||
if not isinstance(value, long):
|
||||
raise TypeError("Must be a long integer: '%s'" % value)
|
||||
self.stats['bytes_in'] = value
|
||||
|
||||
@property
|
||||
def utc_timestamp(self):
|
||||
""" UTC timestamp for when these statistics are generated. """
|
||||
return self._utc_ts
|
||||
|
||||
@utc_timestamp.setter
|
||||
def utc_timestamp(self, value):
|
||||
if not isinstance(value, datetime.datetime):
|
||||
raise TypeError("Must be a datetime.datetime: '%s'" % value)
|
||||
self._utc_ts = value
|
||||
|
||||
def add_node_status(self, node, status):
|
||||
self.nodes[node] = status
|
||||
|
||||
def node_status_map(self):
|
||||
""" Return a dictionary, indexed by node ID, of the node status """
|
||||
return self.nodes
|
@@ -16,7 +16,13 @@ import subprocess
|
||||
|
||||
|
||||
class HAProxyQuery(object):
|
||||
""" Class used for querying the HAProxy statistics socket. """
|
||||
"""
|
||||
Class used for querying the HAProxy statistics socket.
|
||||
|
||||
The CSV output is defined in the HAProxy documentation:
|
||||
|
||||
http://cbonte.github.io/haproxy-dconv/configuration-1.4.html#9
|
||||
"""
|
||||
|
||||
def __init__(self, stats_socket):
|
||||
"""
|
||||
@@ -59,8 +65,8 @@ class HAProxyQuery(object):
|
||||
object_type
|
||||
Select the type of dumpable object. Values can be ORed.
|
||||
-1 - everything
|
||||
1 - backends
|
||||
2 - frontents
|
||||
1 - frontends
|
||||
2 - backends
|
||||
4 - servers
|
||||
|
||||
server_id
|
||||
@@ -71,6 +77,30 @@ class HAProxyQuery(object):
|
||||
list_results = results.split('\n')
|
||||
return list_results
|
||||
|
||||
def get_bytes_out(self, protocol=None):
|
||||
"""
|
||||
Get bytes out for the given protocol, or all protocols if
|
||||
not specified.
|
||||
|
||||
Return a dictionary keyed by protocol with bytes out as the value.
|
||||
"""
|
||||
if protocol:
|
||||
filter_string = protocol.lower() + "-servers"
|
||||
|
||||
results = self.show_stat(object_type=2) # backends only
|
||||
|
||||
final_results = {}
|
||||
for line in results[1:]:
|
||||
elements = line.split(',')
|
||||
if protocol and elements[0] != filter_string:
|
||||
next
|
||||
else:
|
||||
proto, ignore = elements[0].split('-')
|
||||
bytes_out = int(elements[9])
|
||||
final_results[proto.lower()] = bytes_out
|
||||
|
||||
return final_results
|
||||
|
||||
def get_server_status(self, protocol=None):
|
||||
"""
|
||||
Get status for each server for a protocol backend.
|
||||
|
@@ -50,8 +50,12 @@ class ServicesBase:
|
||||
""" Remove current and saved HAProxy config files. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_stats(self):
|
||||
""" Get the stats from HAProxy. """
|
||||
def get_status(self, protocol):
|
||||
""" Get status from HAProxy. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_statistics(self):
|
||||
""" Get statistics from HAProxy. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def sudo_copy(self, from_file, to_file):
|
||||
|
162
libra/worker/drivers/haproxy/stats.py
Normal file
162
libra/worker/drivers/haproxy/stats.py
Normal file
@@ -0,0 +1,162 @@
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import os.path
|
||||
import simplejson
|
||||
|
||||
|
||||
class StatisticsManager(object):
|
||||
"""
|
||||
Class for managing statistics storage.
|
||||
|
||||
Since HAProxy statistics are reset whenever the haproxy process is
|
||||
restarted, we need a reliable way of maintaining these values across
|
||||
restarts. This class attempts to manage the storage of the values.
|
||||
|
||||
There are two types of statistics we record:
|
||||
|
||||
* Unreported stats
|
||||
These are stats that we need to save because a state change in
|
||||
the HAProxy service is causing it to restart. Since HAProxy stores
|
||||
its stats in memory, they would otherwise be lost. We save them here
|
||||
for consideration in the next STATS request.
|
||||
|
||||
* Last queried stats
|
||||
These are total bytes out as reported from HAProxy the last time we
|
||||
queried it for that information.
|
||||
"""
|
||||
|
||||
START_FIELD = 'start'
|
||||
END_FIELD = 'end'
|
||||
|
||||
# UNREPORTED_* values are for unreported statistics due to a restart
|
||||
UNREPORTED_TCP_BYTES_FIELD = 'unreported_tcp_bytes_out'
|
||||
UNREPORTED_HTTP_BYTES_FIELD = 'unreported_http_bytes_out'
|
||||
|
||||
# LAST_* values are for values from our last query
|
||||
LAST_TCP_BYTES_FIELD = 'last_tcp_bytes_out'
|
||||
LAST_HTTP_BYTES_FIELD = 'last_http_bytes_out'
|
||||
|
||||
def __init__(self, filename):
|
||||
self.filename = filename
|
||||
self._object = {}
|
||||
self.read()
|
||||
|
||||
def _do_save(self, obj):
|
||||
with open(self.filename, "w") as fp:
|
||||
simplejson.dump(obj, fp)
|
||||
|
||||
def _format_timestamp(self, ts):
|
||||
return datetime.datetime.strptime(ts, '%Y-%m-%d %H:%M:%S.%f')
|
||||
|
||||
def save(self, start, end, tcp_bytes=0, http_bytes=0,
|
||||
unreported_tcp_bytes=0, unreported_http_bytes=0):
|
||||
"""
|
||||
Save HAProxy statistics values, overwriting any existing data.
|
||||
|
||||
start
|
||||
Start timestamp from our last report.
|
||||
|
||||
end
|
||||
End timestamp from our last report.
|
||||
|
||||
tcp_bytes
|
||||
TOTAL bytes out of the TCP backend, as reported by haproxy,
|
||||
when we last reported them back.
|
||||
|
||||
http_bytes
|
||||
TOTAL bytes out of the HTTP backend, as reported by haproxy,
|
||||
when we last reported them back.
|
||||
|
||||
unreported_tcp_bytes
|
||||
TOTAL bytes out of the TCP backend, as reported by haproxy,
|
||||
when the service was stopped or restarted.
|
||||
|
||||
unreported_http_bytes
|
||||
TOTAL bytes out of the HTTP backend, as reported by haproxy,
|
||||
when the service was stopped or restarted.
|
||||
"""
|
||||
if None in [start, end]:
|
||||
raise Exception('Cannot save None value for timestamps')
|
||||
|
||||
if type(start) != datetime.datetime or type(end) != datetime.datetime:
|
||||
raise TypeError('Timestamps must be datetime.datetime')
|
||||
|
||||
obj = {
|
||||
self.START_FIELD: str(start),
|
||||
self.END_FIELD: str(end),
|
||||
self.LAST_TCP_BYTES_FIELD: tcp_bytes,
|
||||
self.LAST_HTTP_BYTES_FIELD: http_bytes,
|
||||
self.UNREPORTED_TCP_BYTES_FIELD: unreported_tcp_bytes,
|
||||
self.UNREPORTED_HTTP_BYTES_FIELD: unreported_http_bytes
|
||||
}
|
||||
self._do_save(obj)
|
||||
|
||||
def read(self):
|
||||
""" Read the current values from the file """
|
||||
if not os.path.exists(self.filename):
|
||||
return
|
||||
with open(self.filename, "r") as fp:
|
||||
self._object = simplejson.load(fp)
|
||||
|
||||
def get_start(self):
|
||||
""" Return last start timestamp as datetime object """
|
||||
if self.START_FIELD in self._object:
|
||||
return self._format_timestamp(self._object[self.START_FIELD])
|
||||
return None
|
||||
|
||||
def get_end(self):
|
||||
""" Return last end timestamp as datetime object """
|
||||
if self.END_FIELD in self._object:
|
||||
return self._format_timestamp(self._object[self.END_FIELD])
|
||||
return None
|
||||
|
||||
def get_unreported_tcp_bytes(self):
|
||||
""" Return TCP unreported bytes out """
|
||||
if self.UNREPORTED_TCP_BYTES_FIELD in self._object:
|
||||
return int(self._object[self.UNREPORTED_TCP_BYTES_FIELD])
|
||||
return 0
|
||||
|
||||
def get_unreported_http_bytes(self):
|
||||
""" Return HTTP unreported bytes out """
|
||||
if self.UNREPORTED_HTTP_BYTES_FIELD in self._object:
|
||||
return int(self._object[self.UNREPORTED_HTTP_BYTES_FIELD])
|
||||
return 0
|
||||
|
||||
def get_last_tcp_bytes(self):
|
||||
""" Return TCP last reported bytes out """
|
||||
if self.LAST_TCP_BYTES_FIELD in self._object:
|
||||
return int(self._object[self.LAST_TCP_BYTES_FIELD])
|
||||
return 0
|
||||
|
||||
def get_last_http_bytes(self):
|
||||
""" Return HTTP last reported bytes out """
|
||||
if self.LAST_HTTP_BYTES_FIELD in self._object:
|
||||
return int(self._object[self.LAST_HTTP_BYTES_FIELD])
|
||||
return 0
|
||||
|
||||
def calculate_new_start(self):
|
||||
"""
|
||||
Calculate a new start value for our reporting time range,
|
||||
which should be just after the last reported end value. If
|
||||
there is no start value, then we haven't recorded one yet
|
||||
(i.e., haven't reported any stats yet) so use the current time.
|
||||
"""
|
||||
new_start = self.get_end()
|
||||
if new_start is None:
|
||||
new_start = datetime.datetime.utcnow()
|
||||
else:
|
||||
new_start = new_start + datetime.timedelta(microseconds=1)
|
||||
return new_start
|
@@ -12,16 +12,22 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from libra.common.exc import DeletedStateError
|
||||
from libra.worker.drivers.haproxy.lbstats import LBStatistics
|
||||
from libra.worker.drivers.haproxy.services_base import ServicesBase
|
||||
from libra.worker.drivers.haproxy.query import HAProxyQuery
|
||||
from oslo.config import cfg
|
||||
|
||||
from libra.common import exc
|
||||
from libra.openstack.common import log
|
||||
from libra.worker.drivers.haproxy import query
|
||||
from libra.worker.drivers.haproxy import services_base
|
||||
from libra.worker.drivers.haproxy import stats
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class UbuntuServices(ServicesBase):
|
||||
class UbuntuServices(services_base.ServicesBase):
|
||||
""" Ubuntu-specific service implementation. """
|
||||
|
||||
def __init__(self):
|
||||
@@ -29,6 +35,40 @@ class UbuntuServices(ServicesBase):
|
||||
self._config_file = '/etc/haproxy/haproxy.cfg'
|
||||
self._backup_config = self._config_file + '.BKUP'
|
||||
|
||||
def _save_unreported(self):
|
||||
"""
|
||||
Save current HAProxy totals for an expected restart.
|
||||
"""
|
||||
q = query.HAProxyQuery('/var/run/haproxy-stats.socket')
|
||||
results = q.get_bytes_out()
|
||||
|
||||
stats_file = cfg.CONF['worker:haproxy']['statsfile']
|
||||
stats_mgr = stats.StatisticsManager(stats_file)
|
||||
|
||||
# need to carry over current values
|
||||
start = stats_mgr.get_start()
|
||||
end = stats_mgr.get_end()
|
||||
|
||||
if None in [start, end]:
|
||||
start = datetime.datetime.utcnow()
|
||||
end = start
|
||||
|
||||
tcp_bo = stats_mgr.get_last_tcp_bytes()
|
||||
http_bo = stats_mgr.get_last_http_bytes()
|
||||
|
||||
curr_tcp_bo = 0
|
||||
curr_http_bo = 0
|
||||
if 'tcp' in results:
|
||||
curr_tcp_bo = results['tcp']
|
||||
if 'http' in results:
|
||||
curr_http_bo = results['http']
|
||||
|
||||
stats_mgr.save(start, end,
|
||||
tcp_bytes=tcp_bo,
|
||||
http_bytes=http_bo,
|
||||
unreported_tcp_bytes=curr_tcp_bo,
|
||||
unreported_http_bytes=curr_http_bo)
|
||||
|
||||
def syslog_restart(self):
|
||||
cmd = '/usr/bin/sudo -n /usr/sbin/service rsyslog restart'
|
||||
try:
|
||||
@@ -38,6 +78,8 @@ class UbuntuServices(ServicesBase):
|
||||
|
||||
def service_stop(self):
|
||||
""" Stop the HAProxy service on the local machine. """
|
||||
self._save_unreported()
|
||||
|
||||
cmd = '/usr/bin/sudo -n /usr/sbin/service haproxy stop'
|
||||
try:
|
||||
subprocess.check_output(cmd.split())
|
||||
@@ -65,6 +107,8 @@ class UbuntuServices(ServicesBase):
|
||||
This assumes that /etc/init.d/haproxy is using the -sf option
|
||||
to the haproxy process.
|
||||
"""
|
||||
self._save_unreported()
|
||||
|
||||
cmd = '/usr/bin/sudo -n /usr/sbin/service haproxy reload'
|
||||
try:
|
||||
subprocess.check_output(cmd.split())
|
||||
@@ -155,8 +199,7 @@ class UbuntuServices(ServicesBase):
|
||||
|
||||
This function will query the HAProxy statistics socket and pull out
|
||||
the values that it needs for the given protocol (which equates to one
|
||||
load balancer). The values are stored in a LBStatistics object that
|
||||
will be returned to the caller.
|
||||
load balancer).
|
||||
|
||||
The output of the socket query is in CSV format and defined here:
|
||||
|
||||
@@ -164,15 +207,76 @@ class UbuntuServices(ServicesBase):
|
||||
"""
|
||||
|
||||
if not os.path.exists(self._config_file):
|
||||
raise DeletedStateError("Load balancer is deleted.")
|
||||
raise exc.DeletedStateError("Load balancer is deleted.")
|
||||
if not os.path.exists(self._haproxy_pid):
|
||||
raise Exception("HAProxy is not running.")
|
||||
|
||||
stats = LBStatistics()
|
||||
query = HAProxyQuery('/var/run/haproxy-stats.socket')
|
||||
q = query.HAProxyQuery('/var/run/haproxy-stats.socket')
|
||||
return q.get_server_status(protocol)
|
||||
|
||||
node_status_list = query.get_server_status(protocol)
|
||||
for node, status in node_status_list:
|
||||
stats.add_node_status(node, status)
|
||||
def get_statistics(self):
|
||||
if not os.path.exists(self._config_file):
|
||||
raise exc.DeletedStateError("Load balancer is deleted.")
|
||||
if not os.path.exists(self._haproxy_pid):
|
||||
raise Exception("HAProxy is not running.")
|
||||
|
||||
return stats
|
||||
q = query.HAProxyQuery('/var/run/haproxy-stats.socket')
|
||||
results = q.get_bytes_out()
|
||||
|
||||
stats_file = cfg.CONF['worker:haproxy']['statsfile']
|
||||
stats_mgr = stats.StatisticsManager(stats_file)
|
||||
|
||||
# date range for this report
|
||||
new_start = stats_mgr.calculate_new_start()
|
||||
new_end = datetime.datetime.utcnow()
|
||||
|
||||
# previously recorded totals
|
||||
prev_tcp_bo = stats_mgr.get_last_tcp_bytes()
|
||||
prev_http_bo = stats_mgr.get_last_http_bytes()
|
||||
unrpt_tcp_bo = stats_mgr.get_unreported_tcp_bytes()
|
||||
unrpt_http_bo = stats_mgr.get_unreported_http_bytes()
|
||||
|
||||
# current totals
|
||||
current_tcp_bo = 0
|
||||
current_http_bo = 0
|
||||
if 'http' in results:
|
||||
current_http_bo = results['http']
|
||||
if 'tcp' in results:
|
||||
current_tcp_bo = results['tcp']
|
||||
|
||||
# If our totals that we previously recorded are greater than the
|
||||
# totals we have now, and no unreported values, then somehow HAProxy
|
||||
# was restarted outside of the worker's control, so we have no choice
|
||||
# but to zero the values to avoid overcharging on usage.
|
||||
if (unrpt_tcp_bo == 0 and unrpt_http_bo == 0) and \
|
||||
(prev_tcp_bo > current_tcp_bo) or (prev_http_bo > current_http_bo):
|
||||
LOG.warn("Forced reset of HAProxy statistics")
|
||||
prev_tcp_bo = 0
|
||||
prev_http_bo = 0
|
||||
|
||||
# Record totals for each protocol for comparison in the next request.
|
||||
stats_mgr.save(new_start, new_end,
|
||||
tcp_bytes=current_tcp_bo,
|
||||
http_bytes=current_http_bo)
|
||||
|
||||
# We are to deliver the number of bytes out since our last report,
|
||||
# not the total, so calculate that here. Some examples:
|
||||
#
|
||||
# unreported total(A) | prev total(B) | current(C) | returned value
|
||||
# | | | A + C - B
|
||||
# --------------------+---------------+------------+---------------
|
||||
# 0 | 0 | 200 | 200
|
||||
# 0 | 200 | 1500 | 1300
|
||||
# 2000 | 1500 | 100 | 600
|
||||
|
||||
incremental_results = []
|
||||
if 'http' in results:
|
||||
incremental_results.append(
|
||||
('http', unrpt_http_bo + current_http_bo - prev_http_bo)
|
||||
)
|
||||
if 'tcp' in results:
|
||||
incremental_results.append(
|
||||
('tcp', unrpt_tcp_bo + current_tcp_bo - prev_tcp_bo)
|
||||
)
|
||||
|
||||
return str(new_start), str(new_end), incremental_results
|
||||
|
Reference in New Issue
Block a user