For for stats thread when haproxy not running.
Fixes bug #1078773 The worker stats thread now checks to see if haproxy is running before attempting to get stats from it. If it isn't running, a ServiceUnavailable exception is thrown and it goes back to sleep until the next polling interval. Also rename config_manager and stats_manager functions to config_thread and stats_thread to make their purpose a bit clearer. Change-Id: I01e2c86bcac81a00027d4a7201feb922c6e72f3b
This commit is contained in:
@@ -7,7 +7,7 @@ Gearman Worker Thread
|
||||
---------------------
|
||||
.. py:module:: libra.worker.worker
|
||||
|
||||
.. py:function:: config_manager(logger, driver, servers, reconnect_sleep)
|
||||
.. py:function:: config_thread(logger, driver, servers, reconnect_sleep)
|
||||
|
||||
This function encapsulates the functionality for the Gearman worker thread
|
||||
that will be started by the :py:class:`~libra.worker.main.EventServer`
|
||||
@@ -49,7 +49,7 @@ LBaaSController Class
|
||||
.. py:class:: LBaaSController(logger, driver, json_msg)
|
||||
|
||||
This class is used by the Gearman task started within the worker thread
|
||||
(the :py:func:`~libra.worker.worker.config_manager` function) to drive the
|
||||
(the :py:func:`~libra.worker.worker.config_thread` function) to drive the
|
||||
Gearman message handling.
|
||||
|
||||
.. py:method:: run()
|
||||
@@ -132,13 +132,13 @@ The steps shown above are:
|
||||
.. py:module:: libra.worker
|
||||
|
||||
* The Gearman worker task used in the worker thread (see the
|
||||
:py:func:`~worker.config_manager` function), is run when the worker
|
||||
:py:func:`~worker.config_thread` function), is run when the worker
|
||||
receives a message from the Gearman job server (not represented above).
|
||||
* This task then uses the :py:class:`~controller.LBaaSController` to process
|
||||
the message that it received.
|
||||
* Based on the contents of the message, the controller then makes the relevant
|
||||
driver API calls using the :py:class:`~drivers.LoadBalancerDriver` driver
|
||||
that was selected via the :option:`--driver <libra_worker.py --driver>`
|
||||
that was selected via the :option:`--driver <libra_worker.py --driver>`
|
||||
option.
|
||||
* The driver executes the API call. If the driver encounters an error during
|
||||
execution, an exception is thrown that should be handled by the
|
||||
|
||||
17
libra/common/exc.py
Normal file
17
libra/common/exc.py
Normal file
@@ -0,0 +1,17 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class ServiceUnavailable(Exception):
|
||||
pass
|
||||
@@ -16,6 +16,7 @@ import csv
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from libra.common.exc import ServiceUnavailable
|
||||
from libra.common.lbstats import LBStatistics
|
||||
from libra.worker.drivers.haproxy.services_base import ServicesBase
|
||||
|
||||
@@ -118,6 +119,9 @@ class UbuntuServices(ServicesBase):
|
||||
http://cbonte.github.com/haproxy-dconv/configuration-1.4.html#9
|
||||
"""
|
||||
|
||||
if not os.path.exists(self._haproxy_pid):
|
||||
raise ServiceUnavailable()
|
||||
|
||||
stats = LBStatistics()
|
||||
|
||||
cmd = 'echo "show stat" | ' \
|
||||
|
||||
@@ -24,8 +24,8 @@ from libra.openstack.common import importutils
|
||||
from libra.common.options import Options, setup_logging
|
||||
from libra.worker.drivers.base import known_drivers
|
||||
from libra.worker.drivers.haproxy.services_base import haproxy_services
|
||||
from libra.worker.stats_client import stats_manager
|
||||
from libra.worker.worker import config_manager
|
||||
from libra.worker.stats_client import stats_thread
|
||||
from libra.worker.worker import config_thread
|
||||
|
||||
|
||||
class EventServer(object):
|
||||
@@ -118,8 +118,8 @@ def main():
|
||||
|
||||
# Tasks to execute in parallel
|
||||
task_list = [
|
||||
(config_manager, (logger, driver, args.server, args.reconnect_sleep)),
|
||||
(stats_manager, (logger, driver, args.stats_poll))
|
||||
(config_thread, (logger, driver, args.server, args.reconnect_sleep)),
|
||||
(stats_thread, (logger, driver, args.stats_poll))
|
||||
]
|
||||
|
||||
if args.nodaemon:
|
||||
|
||||
@@ -14,8 +14,19 @@
|
||||
|
||||
import eventlet
|
||||
|
||||
from libra.common.exc import ServiceUnavailable
|
||||
|
||||
def stats_manager(logger, driver, stats_poll):
|
||||
|
||||
def record_stats(logger, http_stats, tcp_stats):
|
||||
""" Permanently record load balancer statistics. """
|
||||
logger.debug("[stats] HTTP bytes in/out: (%d, %d)" %
|
||||
(http_stats.bytes_in, http_stats.bytes_out))
|
||||
logger.debug("[stats] TCP bytes in/out: (%d, %d)" %
|
||||
(tcp_stats.bytes_in, tcp_stats.bytes_out))
|
||||
|
||||
|
||||
def stats_thread(logger, driver, stats_poll):
|
||||
""" Statistics thread function. """
|
||||
logger.debug("[stats] Statistics gathering process started.")
|
||||
logger.debug("[stats] Polling interval: %d" % stats_poll)
|
||||
|
||||
@@ -28,14 +39,14 @@ def stats_manager(logger, driver, stats_poll):
|
||||
"[stats] Driver does not implement statisics gathering."
|
||||
)
|
||||
break
|
||||
except ServiceUnavailable:
|
||||
logger.warn("[stats] Unable to get statistics at this time.")
|
||||
except Exception as e:
|
||||
logger.critical("[stats] Exception: %s, %s" % (e.__class__, e))
|
||||
break
|
||||
else:
|
||||
record_stats(logger, http_stats, tcp_stats)
|
||||
|
||||
logger.debug("[stats] HTTP bytes in/out: (%d, %d)" %
|
||||
(http_stats.bytes_in, http_stats.bytes_out))
|
||||
logger.debug("[stats] TCP bytes in/out: (%d, %d)" %
|
||||
(tcp_stats.bytes_in, tcp_stats.bytes_out))
|
||||
eventlet.sleep(stats_poll)
|
||||
|
||||
logger.info("[stats] Statistics gathering process terminated.")
|
||||
|
||||
@@ -47,7 +47,8 @@ def handler(worker, job):
|
||||
return response
|
||||
|
||||
|
||||
def config_manager(logger, driver, servers, reconnect_sleep):
|
||||
def config_thread(logger, driver, servers, reconnect_sleep):
|
||||
""" Worker thread function. """
|
||||
# Version of the JSON message format that this worker understands.
|
||||
msg_fmt_version = "1.0"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user