Browse Source

Refactoring amphora stats driver interface

Previously the stats driver was responsible for parsing the health
message, which should have been done prior to passing the stats to the
driver interface.

Removed the driver interface for the health updater because it is core
Octavia functionality.

Stats drivers is now a singleton and can load multiple drivers.

Both the amphora health manager AND provider statistics should use the
new driver interface.

Co-Authored-By: Stephanie Djajadi <stephanie.djajadi@gmail.com>
Co-Authored-By: Adam Harwell <flux.adam@gmail.com>
Change-Id: I3a013aebd1eb89cd4f983fbf4f8ae8d6639548cd
changes/11/737111/30
asingh12 1 year ago
committed by Adam Harwell
parent
commit
5092597f6b
  1. 12
      etc/octavia.conf
  2. 58
      octavia/amphorae/drivers/driver_base.py
  3. 643
      octavia/amphorae/drivers/health/heartbeat_udp.py
  4. 12
      octavia/amphorae/drivers/noop_driver/driver.py
  5. 40
      octavia/api/drivers/driver_agent/driver_updater.py
  6. 18
      octavia/common/config.py
  7. 9
      octavia/common/data_models.py
  8. 8
      octavia/common/stats.py
  9. 27
      octavia/controller/healthmanager/health_drivers/update_base.py
  10. 606
      octavia/controller/healthmanager/health_drivers/update_db.py
  11. 1
      octavia/db/models.py
  12. 34
      octavia/db/repositories.py
  13. 2
      octavia/opts.py
  14. 0
      octavia/statistics/__init__.py
  15. 0
      octavia/statistics/drivers/__init__.py
  16. 18
      octavia/statistics/drivers/logger.py
  17. 43
      octavia/statistics/drivers/update_db.py
  18. 60
      octavia/statistics/stats_base.py
  19. 82
      octavia/tests/functional/db/test_repositories.py
  20. 1476
      octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py
  21. 14
      octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py
  22. 70
      octavia/tests/unit/api/drivers/driver_agent/test_driver_updater.py
  23. 38
      octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py
  24. 1476
      octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py
  25. 0
      octavia/tests/unit/statistics/__init__.py
  26. 0
      octavia/tests/unit/statistics/drivers/__init__.py
  27. 26
      octavia/tests/unit/statistics/drivers/test_logger.py
  28. 78
      octavia/tests/unit/statistics/drivers/test_update_db.py
  29. 97
      octavia/tests/unit/statistics/test_stats_base.py
  30. 17
      releasenotes/notes/stats-update-drivers-interface-changes-c8f2bf3b02eec767.yaml
  31. 9
      setup.cfg

12
etc/octavia.conf

@ -119,12 +119,6 @@
# health_check_interval = 3
# sock_rlimit = 0
# Health/StatsUpdate options are
# *_db
# *_logger
# health_update_driver = health_db
# stats_update_driver = stats_db
[keystone_authtoken]
# This group of config options are imported from keystone middleware. Thus the
# option names should match the names declared in the middleware.
@ -341,8 +335,14 @@
#
# distributor_driver = distributor_noop_driver
#
# Statistics update driver options are stats_db
# stats_logger
# Multiple values may be specified as a comma-separated list.
# statistics_drivers = stats_db
# Load balancer topology options are SINGLE, ACTIVE_STANDBY
# loadbalancer_topology = SINGLE
# user_data_config_drive = False
# amphora_delete_retries = 5

58
octavia/amphorae/drivers/driver_base.py

@ -189,23 +189,6 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
neutron network to utilize.
"""
def start_health_check(self, health_mixin):
"""Start health checks.
:param health_mixin: health mixin object
:type health_mixin: HealthMixin
Starts listener process and calls HealthMixin to update
databases information.
"""
def stop_health_check(self):
"""Stop health checks.
Stops listener process and calls HealthMixin to update
databases information.
"""
def upload_cert_amp(self, amphora, pem_file):
"""Upload cert info to the amphora.
@ -242,47 +225,6 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
"""
class HealthMixin(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def update_health(self, health):
"""Return ceilometer ready health
:param health: health information emitted from the amphora
:type health: bool
:returns: return health
At this moment, we just build the basic structure for testing, will
add more function along with the development, eventually, we want it
return:
map: {"amphora-status":HEALTHY, loadbalancers: {"loadbalancer-id":
{"loadbalancer-status": HEALTHY,
"listeners":{"listener-id":{"listener-status":HEALTHY,
"nodes":{"node-id":HEALTHY, ...}}, ...}, ...}}
only items whose health has changed need to be submitted
awesome update code
"""
class StatsMixin(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def update_stats(self, stats):
"""Return ceilometer ready stats
:param stats: statistic information emitted from the amphora
:type stats: string
:returns: return stats
At this moment, we just build the basic structure for testing, will
add more function along with the development, eventually, we want it
return:
uses map {"loadbalancer-id":{"listener-id":
{"bytes-in": 123, "bytes_out":123, "active_connections":123,
"total_connections", 123}, ...}
elements are named to keep it extsnsible for future versions
awesome update code and code to send to ceilometer
"""
class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
"""Abstract mixin class for VRRP support in loadbalancer amphorae

643
octavia/amphorae/drivers/health/heartbeat_udp.py

@ -13,42 +13,32 @@
# under the License.
from concurrent import futures
import datetime
import socket
import time
import timeit
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import sqlalchemy
from stevedore import driver as stevedore_driver
from octavia.amphorae.backends.health_daemon import status_message
from octavia.common import constants
from octavia.common import data_models
from octavia.common import exceptions
from octavia.db import repositories
from octavia.db import api as db_api
from octavia.db import repositories as repo
from octavia.statistics import stats_base
UDP_MAX_SIZE = 64 * 1024
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def update_health(obj, srcaddr):
handler = stevedore_driver.DriverManager(
namespace='octavia.amphora.health_update_drivers',
name=CONF.health_manager.health_update_driver,
invoke_on_load=True
).driver
handler.update_health(obj, srcaddr)
def update_stats(obj, srcaddr):
handler = stevedore_driver.DriverManager(
namespace='octavia.amphora.stats_update_drivers',
name=CONF.health_manager.stats_update_driver,
invoke_on_load=True
).driver
handler.update_stats(obj, srcaddr)
class UDPStatusGetter(object):
"""This class defines methods that will gather heatbeats
"""This class defines methods that will gather heartbeats
The heartbeats are transmitted via UDP and this class will bind to a port
and absorb them
@ -67,7 +57,7 @@ class UDPStatusGetter(object):
max_workers=CONF.health_manager.health_update_threads)
self.stats_executor = futures.ProcessPoolExecutor(
max_workers=CONF.health_manager.stats_update_threads)
self.repo = repositories.Repositories().amphorahealth
self.health_updater = UpdateHealthDb()
def update(self, key, ip, port):
"""Update the running config for the udp socket server
@ -99,91 +89,7 @@ class UDPStatusGetter(object):
"""Waits for a UDP heart beat to be sent.
:return: Returns the unwrapped payload and addr that sent the
heartbeat. The format of the obj from the UDP sender
can be seen below. Note that listener_1 has no pools
and listener_4 has no members.
Example::
{
"listeners": {
"listener_uuid_1": {
"pools": {},
"status": "OPEN",
"stats": {
"conns": 0,
"rx": 0,
"tx": 0
}
},
"listener_uuid_2": {
"pools": {
"pool_uuid_1": {
"members": [{
"member_uuid_1": "DOWN"
},
{
"member_uuid_2": "DOWN"
},
{
"member_uuid_3": "DOWN"
},
{
"member_uuid_4": "DOWN"
}
]
}
},
"status": "OPEN",
"stats": {
"conns": 0,
"rx": 0,
"tx": 0
}
},
"listener_uuid_3": {
"pools": {
"pool_uuid_2": {
"members": [{
"member_uuid_5": "DOWN"
},
{
"member_uuid_6": "DOWN"
},
{
"member_uuid_7": "DOWN"
},
{
"member_uuid_8": "DOWN"
}
]
}
},
"status": "OPEN",
"stats": {
"conns": 0,
"rx": 0,
"tx": 0
}
},
"listener_uuid_4": {
"pools": {
"pool_uuid_3": {
"members": []
}
},
"status": "OPEN",
"stats": {
"conns": 0,
"rx": 0,
"tx": 0
}
}
},
"id": "amphora_uuid",
"seq": 1033
}
heartbeat.
"""
(data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
LOG.debug('Received packet from %s', srcaddr)
@ -211,5 +117,526 @@ class UDPStatusGetter(object):
'heartbeat packet. Ignoring this packet. '
'Exception: %s', e)
else:
self.health_executor.submit(update_health, obj, srcaddr)
self.stats_executor.submit(update_stats, obj, srcaddr)
self.health_executor.submit(self.health_updater.update_health,
obj, srcaddr)
self.stats_executor.submit(update_stats, obj)
def update_stats(health_message):
"""Parses the health message then passes it to the stats driver(s)
:param health_message: The health message containing the listener stats
:type health_message: dict
Example V1 message::
health = {
"id": "<amphora_id>",
"listeners": {
"<listener_id>": {
"status": "OPEN",
"stats": {
"ereq": 0,
"conns": 0,
"totconns": 0,
"rx": 0,
"tx": 0,
},
"pools": {
"<pool_id>": {
"status": "UP",
"members": {"<member_id>": "ONLINE"}
}
}
}
}
}
Example V2 message::
{"id": "<amphora_id>",
"seq": 67,
"listeners": {
"<listener_id>": {
"status": "OPEN",
"stats": {
"tx": 0,
"rx": 0,
"conns": 0,
"totconns": 0,
"ereq": 0
}
}
},
"pools": {
"<pool_id>:<listener_id>": {
"status": "UP",
"members": {
"<member_id>": "no check"
}
}
},
"ver": 2
"recv_time": time.time()
}
Example V3 message::
Same as V2 message, except values are deltas rather than absolutes.
"""
version = health_message.get("ver", 2)
deltas = False
if version >= 3:
deltas = True
amphora_id = health_message.get('id')
listeners = health_message.get('listeners', {})
listener_stats = []
for listener_id, listener in listeners.items():
listener_dict = listener.get('stats')
stats_model = data_models.ListenerStatistics(
listener_id=listener_id,
amphora_id=amphora_id,
bytes_in=listener_dict.get('rx'),
bytes_out=listener_dict.get('tx'),
active_connections=listener_dict.get('conns'),
total_connections=listener_dict.get('totconns'),
request_errors=listener_dict.get('ereq'),
received_time=health_message.get('recv_time')
)
LOG.debug("Listener %s / Amphora %s stats: %s",
listener_id, amphora_id, stats_model.get_stats())
listener_stats.append(stats_model)
stats_base.update_stats_via_driver(listener_stats, deltas=deltas)
class UpdateHealthDb:
def __init__(self):
super().__init__()
# first setup repo for amphora, listener,member(nodes),pool repo
self.amphora_repo = repo.AmphoraRepository()
self.amphora_health_repo = repo.AmphoraHealthRepository()
self.listener_repo = repo.ListenerRepository()
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
@staticmethod
def _update_status(session, repo, entity_type,
entity_id, new_op_status, old_op_status):
if old_op_status.lower() != new_op_status.lower():
LOG.debug("%s %s status has changed from %s to "
"%s, updating db.",
entity_type, entity_id, old_op_status,
new_op_status)
repo.update(session, entity_id, operating_status=new_op_status)
def update_health(self, health, srcaddr):
# The executor will eat any exceptions from the update_health code
# so we need to wrap it and log the unhandled exception
start_time = timeit.default_timer()
try:
self._update_health(health, srcaddr)
except Exception as e:
LOG.exception('Health update for amphora %(amp)s encountered '
'error %(err)s. Skipping health update.',
{'amp': health['id'], 'err': e})
# TODO(johnsom) We need to set a warning threshold here
LOG.debug('Health Update finished in: %s seconds',
timeit.default_timer() - start_time)
# Health heartbeat message pre-versioning with UDP listeners
# need to adjust the expected listener count
# This is for backward compatibility with Rocky pre-versioning
# heartbeat amphora.
def _update_listener_count_for_UDP(self, session, db_lb,
expected_listener_count):
# For udp listener, the udp health won't send out by amp agent.
# Once the default_pool of udp listener have the first enabled
# member, then the health will be sent out. So during this
# period, need to figure out the udp listener and ignore them
# by changing expected_listener_count.
for list_id, list_db in db_lb.get('listeners', {}).items():
need_remove = False
if list_db['protocol'] == constants.PROTOCOL_UDP:
listener = self.listener_repo.get(session, id=list_id)
enabled_members = ([member
for member in
listener.default_pool.members
if member.enabled]
if listener.default_pool else [])
if listener.default_pool:
if not listener.default_pool.members:
need_remove = True
elif not enabled_members:
need_remove = True
else:
need_remove = True
if need_remove:
expected_listener_count = expected_listener_count - 1
return expected_listener_count
def _update_health(self, health, srcaddr):
"""This function is to update db info based on amphora status
:param health: map object that contains amphora, listener, member info
:type map: string
:returns: null
The input v1 health data structure is shown as below::
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {
"member-id-1": constants.ONLINE}
}
}
}
}
}
Example V2 message::
{"id": "<amphora_id>",
"seq": 67,
"listeners": {
"<listener_id>": {
"status": "OPEN",
"stats": {
"tx": 0,
"rx": 0,
"conns": 0,
"totconns": 0,
"ereq": 0
}
}
},
"pools": {
"<pool_id>:<listener_id>": {
"status": "UP",
"members": {
"<member_id>": "no check"
}
}
},
"ver": 2
}
"""
session = db_api.get_session()
# We need to see if all of the listeners are reporting in
db_lb = self.amphora_repo.get_lb_for_health_update(session,
health['id'])
ignore_listener_count = False
if db_lb:
expected_listener_count = 0
if ('PENDING' in db_lb['provisioning_status'] or
not db_lb['enabled']):
ignore_listener_count = True
else:
for key, listener in db_lb.get('listeners', {}).items():
# disabled listeners don't report from the amphora
if listener['enabled']:
expected_listener_count += 1
# If this is a heartbeat older than versioning, handle
# UDP special for backward compatibility.
if 'ver' not in health:
udp_listeners = [
l for k, l in db_lb.get('listeners', {}).items()
if l['protocol'] == constants.PROTOCOL_UDP]
if udp_listeners:
expected_listener_count = (
self._update_listener_count_for_UDP(
session, db_lb, expected_listener_count))
else:
# If this is not a spare amp, log and skip it.
amp = self.amphora_repo.get(session, id=health['id'])
if not amp or amp.load_balancer_id:
# This is debug and not warning because this can happen under
# normal deleting operations.
LOG.debug('Received a health heartbeat from amphora %s with '
'IP %s that should not exist. This amphora may be '
'in the process of being deleted, in which case you '
'will only see this message a few '
'times', health['id'], srcaddr)
if not amp:
LOG.warning('The amphora %s with IP %s is missing from '
'the DB, so it cannot be automatically '
'deleted (the compute_id is unknown). An '
'operator must manually delete it from the '
'compute service.', health['id'], srcaddr)
return
# delete the amp right there
try:
compute = stevedore_driver.DriverManager(
namespace='octavia.compute.drivers',
name=CONF.controller_worker.compute_driver,
invoke_on_load=True
).driver
compute.delete(amp.compute_id)
return
except Exception as e:
LOG.info("Error deleting amp %s with IP %s Error: %s",
health['id'], srcaddr, e)
expected_listener_count = 0
listeners = health['listeners']
# Do not update amphora health if the reporting listener count
# does not match the expected listener count
if len(listeners) == expected_listener_count or ignore_listener_count:
lock_session = db_api.get_session(autocommit=False)
# if we're running too far behind, warn and bail
proc_delay = time.time() - health['recv_time']
hb_interval = CONF.health_manager.heartbeat_interval
# TODO(johnsom) We need to set a warning threshold here, and
# escalate to critical when it reaches the
# heartbeat_interval
if proc_delay >= hb_interval:
LOG.warning('Amphora %(id)s health message was processed too '
'slowly: %(delay)ss! The system may be overloaded '
'or otherwise malfunctioning. This heartbeat has '
'been ignored and no update was made to the '
'amphora health entry. THIS IS NOT GOOD.',
{'id': health['id'], 'delay': proc_delay})
return
# if the input amphora is healthy, we update its db info
try:
self.amphora_health_repo.replace(
lock_session, health['id'],
last_update=(datetime.datetime.utcnow()))
lock_session.commit()
except Exception:
with excutils.save_and_reraise_exception():
lock_session.rollback()
else:
LOG.warning('Amphora %(id)s health message reports %(found)i '
'listeners when %(expected)i expected',
{'id': health['id'], 'found': len(listeners),
'expected': expected_listener_count})
# Don't try to update status for spares pool amphora
if not db_lb:
return
processed_pools = []
potential_offline_pools = {}
# We got a heartbeat so lb is healthy until proven otherwise
if db_lb[constants.ENABLED] is False:
lb_status = constants.OFFLINE
else:
lb_status = constants.ONLINE
health_msg_version = health.get('ver', 0)
for listener_id in db_lb.get(constants.LISTENERS, {}):
db_listener = db_lb[constants.LISTENERS][listener_id]
db_op_status = db_listener[constants.OPERATING_STATUS]
listener_status = None
listener = None
if listener_id not in listeners:
if (db_listener[constants.ENABLED] and
db_lb[constants.PROVISIONING_STATUS] ==
constants.ACTIVE):
listener_status = constants.ERROR
else:
listener_status = constants.OFFLINE
else:
listener = listeners[listener_id]
# OPEN = HAProxy listener status nbconn < maxconn
if listener.get('status') == constants.OPEN:
listener_status = constants.ONLINE
# FULL = HAProxy listener status not nbconn < maxconn
elif listener.get('status') == constants.FULL:
listener_status = constants.DEGRADED
if lb_status == constants.ONLINE:
lb_status = constants.DEGRADED
else:
LOG.warning(('Listener %(list)s reported status of '
'%(status)s'),
{'list': listener_id,
'status': listener.get('status')})
try:
if (listener_status is not None and
listener_status != db_op_status):
self._update_status(
session, self.listener_repo, constants.LISTENER,
listener_id, listener_status, db_op_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Listener %s is not in DB", listener_id)
if not listener:
continue
if health_msg_version < 2:
raw_pools = listener['pools']
# normalize the pool IDs. Single process listener pools
# have the listener id appended with an ':' seperator.
# Old multi-process listener pools only have a pool ID.
# This makes sure the keys are only pool IDs.
pools = {(k + ' ')[:k.rfind(':')]: v for k, v in
raw_pools.items()}
for db_pool_id in db_lb.get('pools', {}):
# If we saw this pool already on another listener, skip it.
if db_pool_id in processed_pools:
continue
db_pool_dict = db_lb['pools'][db_pool_id]
lb_status = self._process_pool_status(
session, db_pool_id, db_pool_dict, pools,
lb_status, processed_pools, potential_offline_pools)
if health_msg_version >= 2:
raw_pools = health['pools']
# normalize the pool IDs. Single process listener pools
# have the listener id appended with an ':' seperator.
# Old multi-process listener pools only have a pool ID.
# This makes sure the keys are only pool IDs.
pools = {(k + ' ')[:k.rfind(':')]: v for k, v in raw_pools.items()}
for db_pool_id in db_lb.get('pools', {}):
# If we saw this pool already, skip it.
if db_pool_id in processed_pools:
continue
db_pool_dict = db_lb['pools'][db_pool_id]
lb_status = self._process_pool_status(
session, db_pool_id, db_pool_dict, pools,
lb_status, processed_pools, potential_offline_pools)
for pool_id in potential_offline_pools:
# Skip if we eventually found a status for this pool
if pool_id in processed_pools:
continue
try:
# If the database doesn't already show the pool offline, update
if potential_offline_pools[pool_id] != constants.OFFLINE:
self._update_status(
session, self.pool_repo, constants.POOL,
pool_id, constants.OFFLINE,
potential_offline_pools[pool_id])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Pool %s is not in DB", pool_id)
# Update the load balancer status last
try:
if lb_status != db_lb['operating_status']:
self._update_status(
session, self.loadbalancer_repo,
constants.LOADBALANCER, db_lb['id'], lb_status,
db_lb[constants.OPERATING_STATUS])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Load balancer %s is not in DB", db_lb.id)
def _process_pool_status(
self, session, pool_id, db_pool_dict, pools, lb_status,
processed_pools, potential_offline_pools):
pool_status = None
if pool_id not in pools:
# If we don't have a status update for this pool_id
# add it to the list of potential offline pools and continue.
# We will check the potential offline pool list after we
# finish processing the status updates from all of the listeners.
potential_offline_pools[pool_id] = db_pool_dict['operating_status']
return lb_status
pool = pools[pool_id]
processed_pools.append(pool_id)
# UP = HAProxy backend has working or no servers
if pool.get('status') == constants.UP:
pool_status = constants.ONLINE
# DOWN = HAProxy backend has no working servers
elif pool.get('status') == constants.DOWN:
pool_status = constants.ERROR
lb_status = constants.ERROR
else:
LOG.warning(('Pool %(pool)s reported status of '
'%(status)s'),
{'pool': pool_id,
'status': pool.get('status')})
# Deal with the members that are reporting from
# the Amphora
members = pool['members']
for member_id in db_pool_dict.get('members', {}):
member_status = None
member_db_status = (
db_pool_dict['members'][member_id]['operating_status'])
if member_id not in members:
if member_db_status != constants.NO_MONITOR:
member_status = constants.OFFLINE
else:
status = members[member_id]
# Member status can be "UP" or "UP #/#"
# (transitional)
if status.startswith(constants.UP):
member_status = constants.ONLINE
# Member status can be "DOWN" or "DOWN #/#"
# (transitional)
elif status.startswith(constants.DOWN):
member_status = constants.ERROR
if pool_status == constants.ONLINE:
pool_status = constants.DEGRADED
if lb_status == constants.ONLINE:
lb_status = constants.DEGRADED
elif status == constants.DRAIN:
member_status = constants.DRAINING
elif status == constants.MAINT:
member_status = constants.OFFLINE
elif status == constants.NO_CHECK:
member_status = constants.NO_MONITOR
elif status == constants.RESTARTING:
# RESTARTING means that keepalived is restarting and a down
# member has been detected, the real status of the member
# is not clear, it might mean that the checker hasn't run
# yet.
# In this case, keep previous member_status, and wait for a
# non-transitional status.
pass
else:
LOG.warning('Member %(mem)s reported '
'status of %(status)s',
{'mem': member_id,
'status': status})
try:
if (member_status is not None and
member_status != member_db_status):
self._update_status(
session, self.member_repo, constants.MEMBER,
member_id, member_status, member_db_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Member %s is not able to update "
"in DB", member_id)
try:
if (pool_status is not None and
pool_status != db_pool_dict['operating_status']):
self._update_status(
session, self.pool_repo, constants.POOL,
pool_id, pool_status, db_pool_dict['operating_status'])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Pool %s is not in DB", pool_id)
return lb_status

12
octavia/amphorae/drivers/noop_driver/driver.py

@ -19,18 +19,6 @@ from octavia.amphorae.drivers import driver_base
LOG = logging.getLogger(__name__)
class LoggingUpdate(object):
def update_stats(self, stats):
LOG.debug("Amphora %s no-op, update stats %s",
self.__class__.__name__, stats)
self.stats = stats
def update_health(self, health):
LOG.debug("Amphora %s no-op, update health %s",
self.__class__.__name__, health)
self.health = health
class NoopManager(object):
def __init__(self):

40
octavia/api/drivers/driver_agent/driver_updater.py

@ -12,13 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from octavia_lib.api.drivers import exceptions as driver_exceptions
from octavia_lib.common import constants as lib_consts
from octavia.common import constants as consts
from octavia.common import data_models
from octavia.common import utils
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.statistics import stats_base
class DriverUpdater(object):
@ -151,24 +155,34 @@ class DriverUpdater(object):
:returns: None
"""
listener_stats = statistics.get(lib_consts.LISTENERS, [])
stats_objects = []
for stat in listener_stats:
try:
listener_id = stat.pop('id')
stats_obj = data_models.ListenerStatistics(
listener_id=stat['id'],
bytes_in=stat['bytes_in'],
bytes_out=stat['bytes_out'],
active_connections=stat['active_connections'],
total_connections=stat['total_connections'],
request_errors=stat['request_errors'],
received_time=time.time()
)
stats_objects.append(stats_obj)
except Exception as e:
return {
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
lib_consts.FAULT_STRING: str(e),
lib_consts.STATS_OBJECT: lib_consts.LISTENERS}
# Provider drivers other than the amphora driver do not have
# an amphora ID, use the listener ID again here to meet the
# constraint requirement.
try:
self.listener_stats_repo.replace(self.db_session, listener_id,
listener_id, **stat)
except Exception as e:
return {
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
lib_consts.FAULT_STRING: str(e),
lib_consts.STATS_OBJECT: lib_consts.LISTENERS,
lib_consts.STATS_OBJECT_ID: listener_id}
# Provider drivers other than the amphora driver do not have
# an amphora ID, use the listener ID again here to meet the
# constraint requirement.
try:
if stats_objects:
stats_base.update_stats_via_driver(stats_objects)
except Exception as e:
return {
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
lib_consts.FAULT_STRING: str(e),
lib_consts.STATS_OBJECT: lib_consts.LISTENERS}
return {lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_OK}

18
octavia/common/config.py

@ -261,7 +261,7 @@ networking_opts = [
"neutron RBAC policies.")),
]
healthmanager_opts = [
health_manager_opts = [
cfg.IPOpt('bind_ip', default='127.0.0.1',
help=_('IP address the controller will listen on for '
'heart beats')),
@ -303,11 +303,12 @@ healthmanager_opts = [
mutable=True,
help=_('Sleep time between sending heartbeats.')),
# Used for updating health and stats
# Used for updating health
cfg.StrOpt('health_update_driver', default='health_db',
help=_('Driver for updating amphora health system.')),
cfg.StrOpt('stats_update_driver', default='stats_db',
help=_('Driver for updating amphora statistics.')),
help=_('Driver for updating amphora health system.'),
deprecated_for_removal=True,
deprecated_reason=_('This driver interface was removed.'),
deprecated_since='Victoria'),
]
oslo_messaging_opts = [
@ -485,6 +486,11 @@ controller_worker_opts = [
cfg.StrOpt('distributor_driver',
default='distributor_noop_driver',
help=_('Name of the distributor driver to use')),
cfg.ListOpt('statistics_drivers', default=['stats_db'],
deprecated_name='stats_update_driver',
deprecated_group='health_manager',
deprecated_since='Victoria',
help=_('List of drivers for updating amphora statistics.')),
cfg.StrOpt('loadbalancer_topology',
default=constants.TOPOLOGY_SINGLE,
choices=constants.SUPPORTED_LB_TOPOLOGIES,
@ -846,7 +852,7 @@ cfg.CONF.register_opts(keepalived_vrrp_opts, group='keepalived_vrrp')
cfg.CONF.register_opts(task_flow_opts, group='task_flow')
cfg.CONF.register_opts(house_keeping_opts, group='house_keeping')
cfg.CONF.register_opts(certificate_opts, group='certificates')
cfg.CONF.register_opts(healthmanager_opts, group='health_manager')
cfg.CONF.register_opts(health_manager_opts, group='health_manager')
cfg.CONF.register_opts(nova_opts, group='nova')
cfg.CONF.register_opts(cinder_opts, group='cinder')
cfg.CONF.register_opts(glance_opts, group='glance')

9
octavia/common/data_models.py

@ -182,7 +182,7 @@ class ListenerStatistics(BaseDataModel):
def __init__(self, listener_id=None, amphora_id=None, bytes_in=0,
bytes_out=0, active_connections=0,
total_connections=0, request_errors=0):
total_connections=0, request_errors=0, received_time=0.0):
self.listener_id = listener_id
self.amphora_id = amphora_id
self.bytes_in = bytes_in
@ -190,6 +190,7 @@ class ListenerStatistics(BaseDataModel):
self.active_connections = active_connections
self.total_connections = total_connections
self.request_errors = request_errors
self.received_time = received_time
def get_stats(self):
stats = {
@ -201,8 +202,12 @@ class ListenerStatistics(BaseDataModel):
}
return stats
def __iadd__(self, other):
def db_fields(self):
fields = self.to_dict()
fields.pop('received_time')
return fields
def __iadd__(self, other):
if isinstance(other, ListenerStatistics):
self.bytes_in += other.bytes_in
self.bytes_out += other.bytes_out

8
octavia/common/stats.py

@ -43,7 +43,13 @@ class StatsMixin(object):
statistics += db_l
amp = self.repo_amphora.get(session, id=db_l.amphora_id)
if amp and amp.status == constants.AMPHORA_ALLOCATED:
# Amphora ID and Listener ID will be the same in the case that the
# stats are coming from a provider driver other than the `amphora`
# driver. In that case and when the current amphora is ALLOCATED
# are the only times we should include the *active* connections,
# because non-active amphora will have incorrect counts.
if (amp and amp.status == constants.AMPHORA_ALLOCATED) or (
db_l.amphora_id == db_l.listener_id):
statistics.active_connections += db_l.active_connections
return statistics

27
octavia/controller/healthmanager/health_drivers/update_base.py

@ -1,27 +0,0 @@
# Copyright 2018 GoDaddy
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class HealthUpdateBase(object):
@abc.abstractmethod
def update_health(self, health, srcaddr):
raise NotImplementedError()
class StatsUpdateBase(object):
@abc.abstractmethod
def update_stats(self, health_message, srcaddr):
raise NotImplementedError()

606
octavia/controller/healthmanager/health_drivers/update_db.py

@ -1,606 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import time
import timeit
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import sqlalchemy
from stevedore import driver as stevedore_driver
from octavia.common import constants
from octavia.common import data_models
from octavia.common import stats
from octavia.controller.healthmanager.health_drivers import update_base
from octavia.db import api as db_api
from octavia.db import repositories as repo
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class UpdateHealthDb(update_base.HealthUpdateBase):
def __init__(self):
super().__init__()
# first setup repo for amphora, listener,member(nodes),pool repo
self.amphora_repo = repo.AmphoraRepository()
self.amphora_health_repo = repo.AmphoraHealthRepository()
self.listener_repo = repo.ListenerRepository()
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
def _update_status(self, session, repo, entity_type,
entity_id, new_op_status, old_op_status):
message = {}
if old_op_status.lower() != new_op_status.lower():
LOG.debug("%s %s status has changed from %s to "
"%s, updating db.",
entity_type, entity_id, old_op_status,
new_op_status)
repo.update(session, entity_id, operating_status=new_op_status)
# Map the status for neutron-lbaas compatibility
if new_op_status == constants.DRAINING:
new_op_status = constants.ONLINE
message.update({constants.OPERATING_STATUS: new_op_status})
def update_health(self, health, srcaddr):
# The executor will eat any exceptions from the update_health code
# so we need to wrap it and log the unhandled exception
start_time = timeit.default_timer()
try:
self._update_health(health, srcaddr)
except Exception as e:
LOG.exception('Health update for amphora %(amp)s encountered '
'error %(err)s. Skipping health update.',
{'amp': health['id'], 'err': e})
# TODO(johnsom) We need to set a warning threshold here
LOG.debug('Health Update finished in: %s seconds',
timeit.default_timer() - start_time)
# Health heartbeat messsage pre-versioning with UDP listeners
# need to adjust the expected listener count
# This is for backward compatibility with Rocky pre-versioning
# heartbeat amphora.
def _update_listener_count_for_UDP(self, session, db_lb,
expected_listener_count):
# For udp listener, the udp health won't send out by amp agent.
# Once the default_pool of udp listener have the first enabled
# member, then the health will be sent out. So during this
# period, need to figure out the udp listener and ignore them
# by changing expected_listener_count.
for list_id, list_db in db_lb.get('listeners', {}).items():
need_remove = False
if list_db['protocol'] == constants.PROTOCOL_UDP:
listener = self.listener_repo.get(session, id=list_id)
enabled_members = ([member
for member in
listener.default_pool.members
if member.enabled]
if listener.default_pool else [])
if listener.default_pool:
if not listener.default_pool.members:
need_remove = True
elif not enabled_members:
need_remove = True
else:
need_remove = True
if need_remove:
expected_listener_count = expected_listener_count - 1
return expected_listener_count
def _update_health(self, health, srcaddr):
"""This function is to update db info based on amphora status
:param health: map object that contains amphora, listener, member info
:type map: string
:returns: null
The input v1 health data structure is shown as below::
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {
"member-id-1": constants.ONLINE}
}
}
}
}
}
Example V2 message::
{"id": "<amphora_id>",
"seq": 67,
"listeners": {
"<listener_id>": {
"status": "OPEN",
"stats": {
"tx": 0,
"rx": 0,
"conns": 0,
"totconns": 0,
"ereq": 0
}
}
},
"pools": {
"<pool_id>:<listener_id>": {
"status": "UP",
"members": {
"<member_id>": "no check"
}
}
},
"ver": 2
}
"""
session = db_api.get_session()
# We need to see if all of the listeners are reporting in
db_lb = self.amphora_repo.get_lb_for_health_update(session,
health['id'])
ignore_listener_count = False
if db_lb:
expected_listener_count = 0
if ('PENDING' in db_lb['provisioning_status'] or
not db_lb['enabled']):
ignore_listener_count = True
else:
for key, listener in db_lb.get('listeners', {}).items():
# disabled listeners don't report from the amphora
if listener['enabled']:
expected_listener_count += 1
# If this is a heartbeat older than versioning, handle
# UDP special for backward compatibility.
if 'ver' not in health:
udp_listeners = [
l for k, l in db_lb.get('listeners', {}).items()
if l['protocol'] == constants.PROTOCOL_UDP]
if udp_listeners:
expected_listener_count = (
self._update_listener_count_for_UDP(
session, db_lb, expected_listener_count))
else:
# If this is not a spare amp, log and skip it.
amp = self.amphora_repo.get(session, id=health['id'])
if not amp or amp.load_balancer_id:
# This is debug and not warning because this can happen under
# normal deleting operations.
LOG.debug('Received a health heartbeat from amphora %s with '
'IP %s that should not exist. This amphora may be '
'in the process of being deleted, in which case you '
'will only see this message a few '
'times', health['id'], srcaddr)
if not amp:
LOG.warning('The amphora %s with IP %s is missing from '
'the DB, so it cannot be automatically '
'deleted (the compute_id is unknown). An '
'operator must manually delete it from the '
'compute service.', health['id'], srcaddr)
return
# delete the amp right there
try:
compute = stevedore_driver.DriverManager(
namespace='octavia.compute.drivers',
name=CONF.controller_worker.compute_driver,
invoke_on_load=True
).driver
compute.delete(amp.compute_id)
return
except Exception as e:
LOG.info("Error deleting amp %s with IP %s Error: %s",
health['id'], srcaddr, e)
expected_listener_count = 0
listeners = health['listeners']
# Do not update amphora health if the reporting listener count
# does not match the expected listener count
if len(listeners) == expected_listener_count or ignore_listener_count:
lock_session = db_api.get_session(autocommit=False)
# if we're running too far behind, warn and bail
proc_delay = time.time() - health['recv_time']
hb_interval = CONF.health_manager.heartbeat_interval
# TODO(johnsom) We need to set a warning threshold here, and
# escalate to critical when it reaches the
# heartbeat_interval
if proc_delay >= hb_interval:
LOG.warning('Amphora %(id)s health message was processed too '
'slowly: %(delay)ss! The system may be overloaded '
'or otherwise malfunctioning. This heartbeat has '
'been ignored and no update was made to the '
'amphora health entry. THIS IS NOT GOOD.',
{'id': health['id'], 'delay': proc_delay})
return
# if the input amphora is healthy, we update its db info
try:
self.amphora_health_repo.replace(
lock_session, health['id'],
last_update=(datetime.datetime.utcnow()))
lock_session.commit()
except Exception:
with excutils.save_and_reraise_exception():
lock_session.rollback()
else:
LOG.warning('Amphora %(id)s health message reports %(found)i '
'listeners when %(expected)i expected',
{'id': health['id'], 'found': len(listeners),
'expected': expected_listener_count})
# Don't try to update status for spares pool amphora
if not db_lb:
return
processed_pools = []
potential_offline_pools = {}
# We got a heartbeat so lb is healthy until proven otherwise
if db_lb[constants.ENABLED] is False:
lb_status = constants.OFFLINE
else:
lb_status = constants.ONLINE
health_msg_version = health.get('ver', 0)
for listener_id in db_lb.get(constants.LISTENERS, {}):
db_listener = db_lb[constants.LISTENERS][listener_id]
db_op_status = db_listener[constants.OPERATING_STATUS]
listener_status = None
listener = None
if listener_id not in listeners:
if (db_listener[constants.ENABLED] and
db_lb[constants.PROVISIONING_STATUS] ==
constants.ACTIVE):
listener_status = constants.ERROR
else:
listener_status = constants.OFFLINE
else:
listener = listeners[listener_id]
# OPEN = HAProxy listener status nbconn < maxconn
if listener.get('status') == constants.OPEN:
listener_status = constants.ONLINE
# FULL = HAProxy listener status not nbconn < maxconn
elif listener.get('status') == constants.FULL:
listener_status = constants.DEGRADED
if lb_status == constants.ONLINE:
lb_status = constants.DEGRADED
else:
LOG.warning(('Listener %(list)s reported status of '
'%(status)s'),
{'list': listener_id,
'status': listener.get('status')})
try:
if (listener_status is not None and
listener_status != db_op_status):
self._update_status(
session, self.listener_repo, constants.LISTENER,
listener_id, listener_status, db_op_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Listener %s is not in DB", listener_id)
if not listener:
continue
if health_msg_version < 2:
raw_pools = listener['pools']
# normalize the pool IDs. Single process listener pools
# have the listener id appended with an ':' seperator.
# Old multi-process listener pools only have a pool ID.
# This makes sure the keys are only pool IDs.
pools = {(k + ' ')[:k.rfind(':')]: v for k, v in
raw_pools.items()}
for db_pool_id in db_lb.get('pools', {}):
# If we saw this pool already on another listener, skip it.
if db_pool_id in processed_pools:
continue
db_pool_dict = db_lb['pools'][db_pool_id]
lb_status = self._process_pool_status(
session, db_pool_id, db_pool_dict, pools,
lb_status, processed_pools, potential_offline_pools)
if health_msg_version >= 2:
raw_pools = health['pools']
# normalize the pool IDs. Single process listener pools
# have the listener id appended with an ':' seperator.
# Old multi-process listener pools only have a pool ID.
# This makes sure the keys are only pool IDs.
pools = {(k + ' ')[:k.rfind(':')]: v for k, v in raw_pools.items()}
for db_pool_id in db_lb.get('pools', {}):
# If we saw this pool already, skip it.
if db_pool_id in processed_pools:
continue
db_pool_dict = db_lb['pools'][db_pool_id]
lb_status = self._process_pool_status(
session, db_pool_id, db_pool_dict, pools,
lb_status, processed_pools, potential_offline_pools)
for pool_id in potential_offline_pools:
# Skip if we eventually found a status for this pool
if pool_id in processed_pools:
continue
try:
# If the database doesn't already show the pool offline, update
if potential_offline_pools[pool_id] != constants.OFFLINE:
self._update_status(
session, self.pool_repo, constants.POOL,
pool_id, constants.OFFLINE,
potential_offline_pools[pool_id])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Pool %s is not in DB", pool_id)
# Update the load balancer status last
try:
if lb_status != db_lb['operating_status']:
self._update_status(
session, self.loadbalancer_repo,
constants.LOADBALANCER, db_lb['id'], lb_status,
db_lb[constants.OPERATING_STATUS])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Load balancer %s is not in DB", db_lb.id)
def _process_pool_status(
self, session, pool_id, db_pool_dict, pools, lb_status,
processed_pools, potential_offline_pools):
pool_status = None
if pool_id not in pools:
# If we don't have a status update for this pool_id
# add it to the list of potential offline pools and continue.
# We will check the potential offline pool list after we
# finish processing the status updates from all of the listeners.
potential_offline_pools[pool_id] = db_pool_dict['operating_status']
return lb_status
pool = pools[pool_id]
processed_pools.append(pool_id)
# UP = HAProxy backend has working or no servers
if pool.get('status') == constants.UP:
pool_status = constants.ONLINE
# DOWN = HAProxy backend has no working servers
elif pool.get('status') == constants.DOWN:
pool_status = constants.ERROR
lb_status = constants.ERROR
else:
LOG.warning(('Pool %(pool)s reported status of '
'%(status)s'),
{'pool': pool_id,
'status': pool.get('status')})
# Deal with the members that are reporting from
# the Amphora
members = pool['members']
for member_id in db_pool_dict.get('members', {}):
member_status = None
member_db_status = (
db_pool_dict['members'][member_id]['operating_status'])
if member_id not in members:
if member_db_status != constants.NO_MONITOR:
member_status = constants.OFFLINE
else:
status = members[member_id]
# Member status can be "UP" or "UP #/#"
# (transitional)
if status.startswith(constants.UP):
member_status = constants.ONLINE
# Member status can be "DOWN" or "DOWN #/#"
# (transitional)
elif status.startswith(constants.DOWN):
member_status = constants.ERROR
if pool_status == constants.ONLINE:
pool_status = constants.DEGRADED
if lb_status == constants.ONLINE:
lb_status = constants.DEGRADED
elif status == constants.DRAIN:
member_status = constants.DRAINING
elif status == constants.MAINT:
member_status = constants.OFFLINE
elif status == constants.NO_CHECK:
member_status = constants.NO_MONITOR
elif status == constants.RESTARTING:
# RESTARTING means that keepalived is restarting and a down
# member has been detected, the real status of the member
# is not clear, it might mean that the checker hasn't run
# yet.
# In this case, keep previous member_status, and wait for a
# non-transitional status.
pass
else:
LOG.warning('Member %(mem)s reported '
'status of %(status)s',
{'mem': member_id,
'status': status})
try:
if (member_status is not None and
member_status != member_db_status):
self._update_status(
session, self.member_repo, constants.MEMBER,
member_id, member_status, member_db_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Member %s is not able to update "
"in DB", member_id)
try:
if (pool_status is not None and
pool_status != db_pool_dict['operating_status']):
self._update_status(
session, self.pool_repo, constants.POOL,
pool_id, pool_status, db_pool_dict['operating_status'])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Pool %s is not in DB", pool_id)
return lb_status
class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin):
def update_stats(self, health_message, srcaddr):
# The executor will eat any exceptions from the update_stats code
# so we need to wrap it and log the unhandled exception
try:
self._update_stats(health_message, srcaddr)
except Exception:
LOG.exception('update_stats encountered an unknown error '
'processing stats for amphora %s with IP '
'%s', health_message['id'], srcaddr)
def _update_stat