From bbd4eb0ad061c336f35544c2ad946aa7f6cee6b6 Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Mon, 11 Nov 2013 10:10:26 -0500 Subject: [PATCH] [WORKER] Implement new STATS request STATS requests will now return usage information for each LB defined on a device (TCP, HTTP, or both). This change introduces a new StatisticsManager class for managing temporary storage of HAProxy statistics to deal with the fact that its reported values are reset on restart. Also add tests for the new class. Removed the LBStatistics class since it wasn't necessary. Change-Id: I56177a829650b2206ee855fdf4756ed52825e936 --- etc/libra.cfg | 6 +- libra/tests/worker/test_lbstats.py | 55 ------ libra/tests/worker/test_stats.py | 90 ++++++++++ libra/worker/controller.py | 66 +++++-- libra/worker/drivers/base.py | 21 ++- libra/worker/drivers/haproxy/__init__.py | 3 + libra/worker/drivers/haproxy/driver.py | 3 + libra/worker/drivers/haproxy/lbstats.py | 64 ------- libra/worker/drivers/haproxy/query.py | 36 +++- libra/worker/drivers/haproxy/services_base.py | 8 +- libra/worker/drivers/haproxy/stats.py | 162 ++++++++++++++++++ .../worker/drivers/haproxy/ubuntu_services.py | 132 ++++++++++++-- 12 files changed, 490 insertions(+), 156 deletions(-) delete mode 100644 libra/tests/worker/test_lbstats.py create mode 100644 libra/tests/worker/test_stats.py delete mode 100644 libra/worker/drivers/haproxy/lbstats.py create mode 100644 libra/worker/drivers/haproxy/stats.py diff --git a/etc/libra.cfg b/etc/libra.cfg index 16c28159..72e03a7b 100644 --- a/etc/libra.cfg +++ b/etc/libra.cfg @@ -51,12 +51,11 @@ #driver = haproxy #pid = /var/run/libra/libra_worker.pid -#logfile = /var/log/libra/libra_worker.log # HAProxy driver options for the worker [worker:haproxy] #service = ubuntu -#logfile = /var/log/haproxy.log +#statsfile = /var/log/haproxy.stats #----------------------------------------------------------------------- @@ -67,7 +66,6 @@ # Options with defaults #pid = /var/run/libra/libra_mgm.pid -#logfile = /var/log/libra/libra_mgm.log #threads = 4 #rm_fip_ignore_500 = false #nova_insecure = false @@ -101,7 +99,6 @@ nova_tenant_id = TENANTID # Options with defaults #host = 0.0.0.0 #port = 8889 -#logfile = /var/log/libra/libra_admin_api.log #pid = /var/run/libra/libra_admin_api.pid #expire_days = 0 #node_pool_size = 10 @@ -140,7 +137,6 @@ datadog_tags = service:lbaas #host = 0.0.0.0 #port = 443 #keystone_module = keystoneclient.middleware.auth_token:AuthProtocol -#logfile = /var/log/libra/libra_api.log #pid = /var/run/libra/libra_api.pid # Required options diff --git a/libra/tests/worker/test_lbstats.py b/libra/tests/worker/test_lbstats.py deleted file mode 100644 index 48983845..00000000 --- a/libra/tests/worker/test_lbstats.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -from libra.tests.base import TestCase -from libra.worker.drivers.haproxy.lbstats import LBStatistics - - -class TestLBStatistics(TestCase): - def setUp(self): - super(TestLBStatistics, self).setUp() - self.stats = LBStatistics() - - def testInitValues(self): - now = datetime.datetime.utcnow() - ts = self.stats.utc_timestamp - self.assertEquals(ts.year, now.year) - self.assertEquals(ts.month, now.month) - self.assertEquals(ts.day, now.day) - self.assertEquals(ts.hour, now.hour) - self.assertEquals(self.stats.bytes_out, 0L) - self.assertEquals(self.stats.bytes_in, 0L) - - def testSetBytesIn(self): - self.stats.bytes_in = 99L - self.assertEquals(self.stats.bytes_in, 99L) - e = self.assertRaises(TypeError, setattr, self.stats, - 'bytes_in', "NaN") - self.assertEqual("Must be a long integer: 'NaN'", e.message) - - def testSetBytesOut(self): - self.stats.bytes_out = 100L - self.assertEquals(self.stats.bytes_out, 100L) - e = self.assertRaises(TypeError, setattr, self.stats, - 'bytes_out', "NaN") - self.assertEqual("Must be a long integer: 'NaN'", e.message) - - def testSetUTCTimestamp(self): - ts = datetime.datetime.utcnow() - self.stats.utc_timestamp = ts - self.assertEquals(self.stats.utc_timestamp, ts) - e = self.assertRaises(TypeError, setattr, self.stats, - 'utc_timestamp', "NaN") - self.assertEqual("Must be a datetime.datetime: 'NaN'", e.message) diff --git a/libra/tests/worker/test_stats.py b/libra/tests/worker/test_stats.py new file mode 100644 index 00000000..bc86f31d --- /dev/null +++ b/libra/tests/worker/test_stats.py @@ -0,0 +1,90 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import os.path +import tempfile + +from libra.tests.base import TestCase +from libra.worker.drivers.haproxy.stats import StatisticsManager + + +class TestStatisticsManager(TestCase): + + def setUp(self): + super(TestStatisticsManager, self).setUp() + self.tmpfile = tempfile.gettempdir() + "/tstLibraTestStatsMgr.tmp" + self.mgr = StatisticsManager(self.tmpfile) + + def tearDown(self): + if os.path.exists(self.tmpfile): + os.remove(self.tmpfile) + super(TestStatisticsManager, self).tearDown() + + def testReadNoStatsFile(self): + self.assertEquals(self.mgr.get_start(), None) + self.assertEquals(self.mgr.get_end(), None) + self.assertEquals(self.mgr.get_last_tcp_bytes(), 0) + self.assertEquals(self.mgr.get_last_http_bytes(), 0) + self.assertEquals(self.mgr.get_unreported_tcp_bytes(), 0) + self.assertEquals(self.mgr.get_unreported_http_bytes(), 0) + + def testSave(self): + start_ts = datetime.datetime(2013, 1, 31, 12, 10, 30, 123456) + end_ts = start_ts + datetime.timedelta(minutes=5) + tcp_bytes = 1024 + http_bytes = 2048 + unreported_tcp_bytes = 3000 + unreported_http_bytes = 4000 + + self.mgr.save(start_ts, end_ts, + tcp_bytes=tcp_bytes, http_bytes=http_bytes) + self.mgr.read() + + self.assertEquals(self.mgr.get_start(), start_ts) + self.assertEquals(self.mgr.get_end(), end_ts) + self.assertEquals(self.mgr.get_last_tcp_bytes(), tcp_bytes) + self.assertEquals(self.mgr.get_last_http_bytes(), http_bytes) + self.assertEquals(self.mgr.get_unreported_tcp_bytes(), 0) + self.assertEquals(self.mgr.get_unreported_http_bytes(), 0) + + self.mgr.save(start_ts, end_ts, + unreported_tcp_bytes=unreported_tcp_bytes, + unreported_http_bytes=unreported_http_bytes) + self.mgr.read() + + self.assertEquals(self.mgr.get_start(), start_ts) + self.assertEquals(self.mgr.get_end(), end_ts) + self.assertEquals(self.mgr.get_last_tcp_bytes(), 0) + self.assertEquals(self.mgr.get_last_http_bytes(), 0) + self.assertEquals(self.mgr.get_unreported_tcp_bytes(), + unreported_tcp_bytes) + self.assertEquals(self.mgr.get_unreported_http_bytes(), + unreported_http_bytes) + + self.mgr.save(start_ts, end_ts, + tcp_bytes=tcp_bytes, + http_bytes=http_bytes, + unreported_tcp_bytes=unreported_tcp_bytes, + unreported_http_bytes=unreported_http_bytes) + self.mgr.read() + + self.assertEquals(self.mgr.get_start(), start_ts) + self.assertEquals(self.mgr.get_end(), end_ts) + self.assertEquals(self.mgr.get_last_tcp_bytes(), tcp_bytes) + self.assertEquals(self.mgr.get_last_http_bytes(), http_bytes) + self.assertEquals(self.mgr.get_unreported_tcp_bytes(), + unreported_tcp_bytes) + self.assertEquals(self.mgr.get_unreported_http_bytes(), + unreported_http_bytes) diff --git a/libra/worker/controller.py b/libra/worker/controller.py index a39b1041..3681d9f1 100644 --- a/libra/worker/controller.py +++ b/libra/worker/controller.py @@ -21,7 +21,7 @@ from libra import __release__ as libra_release from libra.common.exc import DeletedStateError from libra.common.faults import BadRequest from libra.openstack.common import log -from libra.worker.drivers.base import LoadBalancerDriver +from libra.worker.drivers import base LOG = log.getLogger(__name__) @@ -71,8 +71,7 @@ class LBaaSController(object): elif action == 'ARCHIVE': return self._action_archive() elif action == 'STATS': - # TODO: Implement new STATS function - return self._action_ping() + return self._action_stats() elif action == 'PING': return self._action_ping() elif action == 'DIAGNOSTICS': @@ -203,15 +202,15 @@ class LBaaSController(object): if 'algorithm' in current_lb: algo = current_lb['algorithm'].upper() if algo == 'ROUND_ROBIN': - algo = LoadBalancerDriver.ROUNDROBIN + algo = base.LoadBalancerDriver.ROUNDROBIN elif algo == 'LEAST_CONNECTIONS': - algo = LoadBalancerDriver.LEASTCONN + algo = base.LoadBalancerDriver.LEASTCONN else: LOG.error("Invalid algorithm: %s" % algo) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE return self.msg else: - algo = LoadBalancerDriver.ROUNDROBIN + algo = base.LoadBalancerDriver.ROUNDROBIN try: self.driver.set_algorithm(current_lb['protocol'], algo) @@ -451,26 +450,69 @@ class LBaaSController(object): """ try: - stats = self.driver.get_status() + nodes = self.driver.get_status() except NotImplementedError: error = "Selected driver does not support PING action." LOG.error(error) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE self.msg[self.ERROR_FIELD] = error except DeletedStateError: - LOG.info("Invalid operation PING on a deleted LB") + error = "Invalid operation PING on a deleted LB." + LOG.error(error) self.msg['status'] = 'DELETED' self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + self.msg[self.ERROR_FIELD] = error except Exception as e: LOG.error("PING failed: %s, %s" % (e.__class__, e)) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE self.msg[self.ERROR_FIELD] = str(e) else: - node_status = stats.node_status_map() self.msg['nodes'] = [] - for node in node_status.keys(): - self.msg['nodes'].append({'id': node, - 'status': node_status[node]}) + for node, status in nodes: + self.msg['nodes'].append({'id': node, 'status': status}) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS return self.msg + + def _action_stats(self): + """ + Get load balancer statistics + + This type of request gets the number of bytes out for each load + balancer defined on the device. If both a TCP and HTTP load + balancer exist, we report on each in a single response. + """ + + try: + start, end, statistics = self.driver.get_statistics() + except NotImplementedError: + error = "Selected driver does not support STATS action." + LOG.error(error) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + self.msg[self.ERROR_FIELD] = error + return self.msg + except DeletedStateError: + error = "Invalid operation STATS on a deleted LB." + LOG.error(error) + self.msg['status'] = 'DELETED' + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + self.msg[self.ERROR_FIELD] = error + return self.msg + except Exception as e: + LOG.error("STATS failed: %s, %s" % (e.__class__, e)) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + self.msg[self.ERROR_FIELD] = str(e) + return self.msg + + self.msg['utc_start'] = start + self.msg['utc_end'] = end + self.msg['loadBalancers'] = [] + + # We should have a list of tuples pairing the number of bytes + # out with the protocol/LB. + for proto, bytes_out in statistics: + self.msg['loadBalancers'].append({'protocol': proto, + 'bytes_out': bytes_out}) + + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS + return self.msg diff --git a/libra/worker/drivers/base.py b/libra/worker/drivers/base.py index 003997ac..35f919cf 100644 --- a/libra/worker/drivers/base.py +++ b/libra/worker/drivers/base.py @@ -97,7 +97,26 @@ class LoadBalancerDriver(object): raise NotImplementedError() def get_status(self, protocol): - """ Get load balancer status for specified protocol. """ + """ + Get load balancer status for specified protocol. + + Returns a list of tuples containing (in this order): + - node ID + - node status + """ + raise NotImplementedError() + + def get_statistics(self): + """ + Get load balancer statistics for all LBs on the device. + + Returns a tuple containing (in this order): + - start timestamp for the reporting period as a string + - end timestamp for the reporting period as a string + - list of tuples containing (in this order): + - protocol for the LB ('tcp' or 'http') as a string + - bytes out for this LB for this reporting period as an int + """ raise NotImplementedError() def archive(self, method, params): diff --git a/libra/worker/drivers/haproxy/__init__.py b/libra/worker/drivers/haproxy/__init__.py index c69799f0..db101aaa 100644 --- a/libra/worker/drivers/haproxy/__init__.py +++ b/libra/worker/drivers/haproxy/__init__.py @@ -26,6 +26,9 @@ cfg.CONF.register_opts( cfg.StrOpt('logfile', default='/var/log/haproxy.log', help='Location of HAProxy logfile'), + cfg.StrOpt('statsfile', + default='/var/log/haproxy.stats', + help='Location of the HAProxy statistics cache file'), ], group=haproxy_group ) diff --git a/libra/worker/drivers/haproxy/driver.py b/libra/worker/drivers/haproxy/driver.py index a5be917b..9082ec7d 100644 --- a/libra/worker/drivers/haproxy/driver.py +++ b/libra/worker/drivers/haproxy/driver.py @@ -396,6 +396,9 @@ class HAProxyDriver(LoadBalancerDriver): def get_status(self, protocol=None): return self.ossvc.get_status(protocol) + def get_statistics(self): + return self.ossvc.get_statistics() + def archive(self, method, params): """ Implementation of the archive() API call. diff --git a/libra/worker/drivers/haproxy/lbstats.py b/libra/worker/drivers/haproxy/lbstats.py deleted file mode 100644 index faee48f7..00000000 --- a/libra/worker/drivers/haproxy/lbstats.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime - - -class LBStatistics(object): - """ Load balancer statistics class. """ - - def __init__(self): - self.stats = {} - self.bytes_out = 0L - self.bytes_in = 0L - self.nodes = dict() - self.utc_timestamp = datetime.datetime.utcnow() - - @property - def bytes_out(self): - return self.stats['bytes_out'] - - @bytes_out.setter - def bytes_out(self, value): - if not isinstance(value, long): - raise TypeError("Must be a long integer: '%s'" % value) - self.stats['bytes_out'] = value - - @property - def bytes_in(self): - return self.stats['bytes_in'] - - @bytes_in.setter - def bytes_in(self, value): - if not isinstance(value, long): - raise TypeError("Must be a long integer: '%s'" % value) - self.stats['bytes_in'] = value - - @property - def utc_timestamp(self): - """ UTC timestamp for when these statistics are generated. """ - return self._utc_ts - - @utc_timestamp.setter - def utc_timestamp(self, value): - if not isinstance(value, datetime.datetime): - raise TypeError("Must be a datetime.datetime: '%s'" % value) - self._utc_ts = value - - def add_node_status(self, node, status): - self.nodes[node] = status - - def node_status_map(self): - """ Return a dictionary, indexed by node ID, of the node status """ - return self.nodes diff --git a/libra/worker/drivers/haproxy/query.py b/libra/worker/drivers/haproxy/query.py index cd99ef7a..60a7502f 100644 --- a/libra/worker/drivers/haproxy/query.py +++ b/libra/worker/drivers/haproxy/query.py @@ -16,7 +16,13 @@ import subprocess class HAProxyQuery(object): - """ Class used for querying the HAProxy statistics socket. """ + """ + Class used for querying the HAProxy statistics socket. + + The CSV output is defined in the HAProxy documentation: + + http://cbonte.github.io/haproxy-dconv/configuration-1.4.html#9 + """ def __init__(self, stats_socket): """ @@ -59,8 +65,8 @@ class HAProxyQuery(object): object_type Select the type of dumpable object. Values can be ORed. -1 - everything - 1 - backends - 2 - frontents + 1 - frontends + 2 - backends 4 - servers server_id @@ -71,6 +77,30 @@ class HAProxyQuery(object): list_results = results.split('\n') return list_results + def get_bytes_out(self, protocol=None): + """ + Get bytes out for the given protocol, or all protocols if + not specified. + + Return a dictionary keyed by protocol with bytes out as the value. + """ + if protocol: + filter_string = protocol.lower() + "-servers" + + results = self.show_stat(object_type=2) # backends only + + final_results = {} + for line in results[1:]: + elements = line.split(',') + if protocol and elements[0] != filter_string: + next + else: + proto, ignore = elements[0].split('-') + bytes_out = int(elements[9]) + final_results[proto.lower()] = bytes_out + + return final_results + def get_server_status(self, protocol=None): """ Get status for each server for a protocol backend. diff --git a/libra/worker/drivers/haproxy/services_base.py b/libra/worker/drivers/haproxy/services_base.py index 9baf14d8..2e8b85b9 100644 --- a/libra/worker/drivers/haproxy/services_base.py +++ b/libra/worker/drivers/haproxy/services_base.py @@ -50,8 +50,12 @@ class ServicesBase: """ Remove current and saved HAProxy config files. """ raise NotImplementedError() - def get_stats(self): - """ Get the stats from HAProxy. """ + def get_status(self, protocol): + """ Get status from HAProxy. """ + raise NotImplementedError() + + def get_statistics(self): + """ Get statistics from HAProxy. """ raise NotImplementedError() def sudo_copy(self, from_file, to_file): diff --git a/libra/worker/drivers/haproxy/stats.py b/libra/worker/drivers/haproxy/stats.py new file mode 100644 index 00000000..4238d964 --- /dev/null +++ b/libra/worker/drivers/haproxy/stats.py @@ -0,0 +1,162 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import os.path +import simplejson + + +class StatisticsManager(object): + """ + Class for managing statistics storage. + + Since HAProxy statistics are reset whenever the haproxy process is + restarted, we need a reliable way of maintaining these values across + restarts. This class attempts to manage the storage of the values. + + There are two types of statistics we record: + + * Unreported stats + These are stats that we need to save because a state change in + the HAProxy service is causing it to restart. Since HAProxy stores + its stats in memory, they would otherwise be lost. We save them here + for consideration in the next STATS request. + + * Last queried stats + These are total bytes out as reported from HAProxy the last time we + queried it for that information. + """ + + START_FIELD = 'start' + END_FIELD = 'end' + + # UNREPORTED_* values are for unreported statistics due to a restart + UNREPORTED_TCP_BYTES_FIELD = 'unreported_tcp_bytes_out' + UNREPORTED_HTTP_BYTES_FIELD = 'unreported_http_bytes_out' + + # LAST_* values are for values from our last query + LAST_TCP_BYTES_FIELD = 'last_tcp_bytes_out' + LAST_HTTP_BYTES_FIELD = 'last_http_bytes_out' + + def __init__(self, filename): + self.filename = filename + self._object = {} + self.read() + + def _do_save(self, obj): + with open(self.filename, "w") as fp: + simplejson.dump(obj, fp) + + def _format_timestamp(self, ts): + return datetime.datetime.strptime(ts, '%Y-%m-%d %H:%M:%S.%f') + + def save(self, start, end, tcp_bytes=0, http_bytes=0, + unreported_tcp_bytes=0, unreported_http_bytes=0): + """ + Save HAProxy statistics values, overwriting any existing data. + + start + Start timestamp from our last report. + + end + End timestamp from our last report. + + tcp_bytes + TOTAL bytes out of the TCP backend, as reported by haproxy, + when we last reported them back. + + http_bytes + TOTAL bytes out of the HTTP backend, as reported by haproxy, + when we last reported them back. + + unreported_tcp_bytes + TOTAL bytes out of the TCP backend, as reported by haproxy, + when the service was stopped or restarted. + + unreported_http_bytes + TOTAL bytes out of the HTTP backend, as reported by haproxy, + when the service was stopped or restarted. + """ + if None in [start, end]: + raise Exception('Cannot save None value for timestamps') + + if type(start) != datetime.datetime or type(end) != datetime.datetime: + raise TypeError('Timestamps must be datetime.datetime') + + obj = { + self.START_FIELD: str(start), + self.END_FIELD: str(end), + self.LAST_TCP_BYTES_FIELD: tcp_bytes, + self.LAST_HTTP_BYTES_FIELD: http_bytes, + self.UNREPORTED_TCP_BYTES_FIELD: unreported_tcp_bytes, + self.UNREPORTED_HTTP_BYTES_FIELD: unreported_http_bytes + } + self._do_save(obj) + + def read(self): + """ Read the current values from the file """ + if not os.path.exists(self.filename): + return + with open(self.filename, "r") as fp: + self._object = simplejson.load(fp) + + def get_start(self): + """ Return last start timestamp as datetime object """ + if self.START_FIELD in self._object: + return self._format_timestamp(self._object[self.START_FIELD]) + return None + + def get_end(self): + """ Return last end timestamp as datetime object """ + if self.END_FIELD in self._object: + return self._format_timestamp(self._object[self.END_FIELD]) + return None + + def get_unreported_tcp_bytes(self): + """ Return TCP unreported bytes out """ + if self.UNREPORTED_TCP_BYTES_FIELD in self._object: + return int(self._object[self.UNREPORTED_TCP_BYTES_FIELD]) + return 0 + + def get_unreported_http_bytes(self): + """ Return HTTP unreported bytes out """ + if self.UNREPORTED_HTTP_BYTES_FIELD in self._object: + return int(self._object[self.UNREPORTED_HTTP_BYTES_FIELD]) + return 0 + + def get_last_tcp_bytes(self): + """ Return TCP last reported bytes out """ + if self.LAST_TCP_BYTES_FIELD in self._object: + return int(self._object[self.LAST_TCP_BYTES_FIELD]) + return 0 + + def get_last_http_bytes(self): + """ Return HTTP last reported bytes out """ + if self.LAST_HTTP_BYTES_FIELD in self._object: + return int(self._object[self.LAST_HTTP_BYTES_FIELD]) + return 0 + + def calculate_new_start(self): + """ + Calculate a new start value for our reporting time range, + which should be just after the last reported end value. If + there is no start value, then we haven't recorded one yet + (i.e., haven't reported any stats yet) so use the current time. + """ + new_start = self.get_end() + if new_start is None: + new_start = datetime.datetime.utcnow() + else: + new_start = new_start + datetime.timedelta(microseconds=1) + return new_start diff --git a/libra/worker/drivers/haproxy/ubuntu_services.py b/libra/worker/drivers/haproxy/ubuntu_services.py index 8e9b9bab..e028de00 100644 --- a/libra/worker/drivers/haproxy/ubuntu_services.py +++ b/libra/worker/drivers/haproxy/ubuntu_services.py @@ -12,16 +12,22 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import os import subprocess -from libra.common.exc import DeletedStateError -from libra.worker.drivers.haproxy.lbstats import LBStatistics -from libra.worker.drivers.haproxy.services_base import ServicesBase -from libra.worker.drivers.haproxy.query import HAProxyQuery +from oslo.config import cfg + +from libra.common import exc +from libra.openstack.common import log +from libra.worker.drivers.haproxy import query +from libra.worker.drivers.haproxy import services_base +from libra.worker.drivers.haproxy import stats + +LOG = log.getLogger(__name__) -class UbuntuServices(ServicesBase): +class UbuntuServices(services_base.ServicesBase): """ Ubuntu-specific service implementation. """ def __init__(self): @@ -29,6 +35,40 @@ class UbuntuServices(ServicesBase): self._config_file = '/etc/haproxy/haproxy.cfg' self._backup_config = self._config_file + '.BKUP' + def _save_unreported(self): + """ + Save current HAProxy totals for an expected restart. + """ + q = query.HAProxyQuery('/var/run/haproxy-stats.socket') + results = q.get_bytes_out() + + stats_file = cfg.CONF['worker:haproxy']['statsfile'] + stats_mgr = stats.StatisticsManager(stats_file) + + # need to carry over current values + start = stats_mgr.get_start() + end = stats_mgr.get_end() + + if None in [start, end]: + start = datetime.datetime.utcnow() + end = start + + tcp_bo = stats_mgr.get_last_tcp_bytes() + http_bo = stats_mgr.get_last_http_bytes() + + curr_tcp_bo = 0 + curr_http_bo = 0 + if 'tcp' in results: + curr_tcp_bo = results['tcp'] + if 'http' in results: + curr_http_bo = results['http'] + + stats_mgr.save(start, end, + tcp_bytes=tcp_bo, + http_bytes=http_bo, + unreported_tcp_bytes=curr_tcp_bo, + unreported_http_bytes=curr_http_bo) + def syslog_restart(self): cmd = '/usr/bin/sudo -n /usr/sbin/service rsyslog restart' try: @@ -38,6 +78,8 @@ class UbuntuServices(ServicesBase): def service_stop(self): """ Stop the HAProxy service on the local machine. """ + self._save_unreported() + cmd = '/usr/bin/sudo -n /usr/sbin/service haproxy stop' try: subprocess.check_output(cmd.split()) @@ -65,6 +107,8 @@ class UbuntuServices(ServicesBase): This assumes that /etc/init.d/haproxy is using the -sf option to the haproxy process. """ + self._save_unreported() + cmd = '/usr/bin/sudo -n /usr/sbin/service haproxy reload' try: subprocess.check_output(cmd.split()) @@ -155,8 +199,7 @@ class UbuntuServices(ServicesBase): This function will query the HAProxy statistics socket and pull out the values that it needs for the given protocol (which equates to one - load balancer). The values are stored in a LBStatistics object that - will be returned to the caller. + load balancer). The output of the socket query is in CSV format and defined here: @@ -164,15 +207,76 @@ class UbuntuServices(ServicesBase): """ if not os.path.exists(self._config_file): - raise DeletedStateError("Load balancer is deleted.") + raise exc.DeletedStateError("Load balancer is deleted.") if not os.path.exists(self._haproxy_pid): raise Exception("HAProxy is not running.") - stats = LBStatistics() - query = HAProxyQuery('/var/run/haproxy-stats.socket') + q = query.HAProxyQuery('/var/run/haproxy-stats.socket') + return q.get_server_status(protocol) - node_status_list = query.get_server_status(protocol) - for node, status in node_status_list: - stats.add_node_status(node, status) + def get_statistics(self): + if not os.path.exists(self._config_file): + raise exc.DeletedStateError("Load balancer is deleted.") + if not os.path.exists(self._haproxy_pid): + raise Exception("HAProxy is not running.") - return stats + q = query.HAProxyQuery('/var/run/haproxy-stats.socket') + results = q.get_bytes_out() + + stats_file = cfg.CONF['worker:haproxy']['statsfile'] + stats_mgr = stats.StatisticsManager(stats_file) + + # date range for this report + new_start = stats_mgr.calculate_new_start() + new_end = datetime.datetime.utcnow() + + # previously recorded totals + prev_tcp_bo = stats_mgr.get_last_tcp_bytes() + prev_http_bo = stats_mgr.get_last_http_bytes() + unrpt_tcp_bo = stats_mgr.get_unreported_tcp_bytes() + unrpt_http_bo = stats_mgr.get_unreported_http_bytes() + + # current totals + current_tcp_bo = 0 + current_http_bo = 0 + if 'http' in results: + current_http_bo = results['http'] + if 'tcp' in results: + current_tcp_bo = results['tcp'] + + # If our totals that we previously recorded are greater than the + # totals we have now, and no unreported values, then somehow HAProxy + # was restarted outside of the worker's control, so we have no choice + # but to zero the values to avoid overcharging on usage. + if (unrpt_tcp_bo == 0 and unrpt_http_bo == 0) and \ + (prev_tcp_bo > current_tcp_bo) or (prev_http_bo > current_http_bo): + LOG.warn("Forced reset of HAProxy statistics") + prev_tcp_bo = 0 + prev_http_bo = 0 + + # Record totals for each protocol for comparison in the next request. + stats_mgr.save(new_start, new_end, + tcp_bytes=current_tcp_bo, + http_bytes=current_http_bo) + + # We are to deliver the number of bytes out since our last report, + # not the total, so calculate that here. Some examples: + # + # unreported total(A) | prev total(B) | current(C) | returned value + # | | | A + C - B + # --------------------+---------------+------------+--------------- + # 0 | 0 | 200 | 200 + # 0 | 200 | 1500 | 1300 + # 2000 | 1500 | 100 | 600 + + incremental_results = [] + if 'http' in results: + incremental_results.append( + ('http', unrpt_http_bo + current_http_bo - prev_http_bo) + ) + if 'tcp' in results: + incremental_results.append( + ('tcp', unrpt_tcp_bo + current_tcp_bo - prev_tcp_bo) + ) + + return str(new_start), str(new_end), incremental_results