Fix health manager performance regression
When running stress tests against the Octavia Health Manager it was observed that the scalability and performance of the health manager has degraded. It was observed that the ORM layer was forming poorly optimized queries, putting excessive load on the database engine and unnecessary code paths were executing for each heartbeat message. This patch optimizes the health manager processing of amphora-agent heartbeat messages by optimizing the database requests, pool processing, and event streamer code paths. Change-Id: I2f75715b09430ad139306d9196df0ec5d7a63da8 Story: 2001896 Task: 14381
This commit is contained in:
parent
2a2b308a39
commit
f13a2e6546
|
@ -47,43 +47,83 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
self.loadbalancer_repo = repo.LoadBalancerRepository()
|
||||
self.member_repo = repo.MemberRepository()
|
||||
self.pool_repo = repo.PoolRepository()
|
||||
self.sync_prv_status = CONF.health_manager.sync_provisioning_status
|
||||
|
||||
def emit(self, info_type, info_id, info_obj):
|
||||
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
||||
self.event_streamer.emit(cnt)
|
||||
|
||||
def _update_status_and_emit_event(self, session, repo, entity_type,
|
||||
entity_id, new_op_status, old_op_status,
|
||||
current_prov_status):
|
||||
entity_id, new_op_status, old_op_status):
|
||||
message = {}
|
||||
if old_op_status.lower() != new_op_status.lower():
|
||||
LOG.debug("%s %s status has changed from %s to "
|
||||
"%s. Updating db and sending event.",
|
||||
"%s, updating db.",
|
||||
entity_type, entity_id, old_op_status,
|
||||
new_op_status)
|
||||
repo.update(session, entity_id, operating_status=new_op_status)
|
||||
# Map the status for neutron-lbaas
|
||||
if new_op_status == constants.DRAINING:
|
||||
new_op_status = constants.ONLINE
|
||||
message.update({constants.OPERATING_STATUS: new_op_status})
|
||||
if self.sync_prv_status:
|
||||
LOG.debug("%s %s provisioning_status %s. "
|
||||
"Sending event.",
|
||||
entity_type, entity_id, current_prov_status)
|
||||
message.update(
|
||||
{constants.PROVISIONING_STATUS: current_prov_status})
|
||||
if message:
|
||||
self.emit(entity_type, entity_id, message)
|
||||
if (CONF.health_manager.event_streamer_driver !=
|
||||
constants.NOOP_EVENT_STREAMER):
|
||||
if CONF.health_manager.sync_provisioning_status:
|
||||
current_prov_status = repo.get(
|
||||
session, id=entity_id).provisioning_status
|
||||
LOG.debug("%s %s provisioning_status %s. "
|
||||
"Sending event.",
|
||||
entity_type, entity_id, current_prov_status)
|
||||
message.update(
|
||||
{constants.PROVISIONING_STATUS: current_prov_status})
|
||||
if message:
|
||||
self.emit(entity_type, entity_id, message)
|
||||
|
||||
def update_health(self, health, srcaddr):
|
||||
# The executor will eat any exceptions from the update_health code
|
||||
# so we need to wrap it and log the unhandled exception
|
||||
start_time = timeit.default_timer()
|
||||
try:
|
||||
self._update_health(health, srcaddr)
|
||||
except Exception:
|
||||
LOG.exception('update_health encountered an unknown error '
|
||||
'processing health message for amphora {0} with IP '
|
||||
'{1}'.format(health['id'], srcaddr))
|
||||
except Exception as e:
|
||||
LOG.exception('Health update for amphora %(amp)s encountered '
|
||||
'error %(err)s. Skipping health update.',
|
||||
{'amp': health['id'], 'err': str(e)})
|
||||
# TODO(johnsom) We need to set a warning threshold here
|
||||
LOG.debug('Health Update finished in: {0} seconds'.format(
|
||||
timeit.default_timer() - start_time))
|
||||
|
||||
# Health heartbeat messsage pre-versioning with UDP listeners
|
||||
# need to adjust the expected listener count
|
||||
# This is for backward compatibility with Rocky pre-versioning
|
||||
# heartbeat amphora.
|
||||
def _update_listener_count_for_UDP(self, session, lb_id,
|
||||
expected_listener_count):
|
||||
lb_db_obj = self.loadbalancer_repo.get(session, id=lb_id)
|
||||
|
||||
# For udp listener, the udp health won't send out by amp agent.
|
||||
# Once the default_pool of udp listener have the first enabled
|
||||
# member, then the health will be sent out. So during this
|
||||
# period, need to figure out the udp listener and ignore them
|
||||
# by changing expected_listener_count.
|
||||
for listener in lb_db_obj.listeners:
|
||||
need_remove = False
|
||||
if listener.protocol == constants.PROTOCOL_UDP:
|
||||
enabled_members = ([member
|
||||
for member in
|
||||
listener.default_pool.members
|
||||
if member.enabled]
|
||||
if listener.default_pool else [])
|
||||
if listener.default_pool:
|
||||
if not listener.default_pool.members:
|
||||
need_remove = True
|
||||
elif not enabled_members:
|
||||
need_remove = True
|
||||
else:
|
||||
need_remove = True
|
||||
|
||||
if need_remove:
|
||||
expected_listener_count = expected_listener_count - 1
|
||||
return expected_listener_count
|
||||
|
||||
def _update_health(self, health, srcaddr):
|
||||
"""This function is to update db info based on amphora status
|
||||
|
@ -108,43 +148,25 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
}
|
||||
|
||||
"""
|
||||
start_time = timeit.default_timer()
|
||||
session = db_api.get_session()
|
||||
|
||||
# We need to see if all of the listeners are reporting in
|
||||
db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id'])
|
||||
db_lb = self.amphora_repo.get_lb_for_health_update(session,
|
||||
health['id'])
|
||||
ignore_listener_count = False
|
||||
listeners = health['listeners']
|
||||
|
||||
if db_lb:
|
||||
expected_listener_count = len(db_lb.listeners)
|
||||
|
||||
# For udp listener, the udp health won't send out by amp agent.
|
||||
# Once the default_pool of udp listener have the first enabled
|
||||
# member, then the health will be sent out. So during this period,
|
||||
# need to figure out the udp listener and ignore them by changing
|
||||
# expected_listener_count.
|
||||
for listener in db_lb.listeners:
|
||||
need_remove = False
|
||||
if listener.protocol == constants.PROTOCOL_UDP:
|
||||
enabled_members = ([member
|
||||
for member in
|
||||
listener.default_pool.members
|
||||
if member.enabled]
|
||||
if listener.default_pool else [])
|
||||
if listener.default_pool:
|
||||
if not listener.default_pool.members:
|
||||
need_remove = True
|
||||
elif not enabled_members:
|
||||
need_remove = True
|
||||
else:
|
||||
need_remove = True
|
||||
|
||||
if need_remove:
|
||||
expected_listener_count = expected_listener_count - 1
|
||||
|
||||
if 'PENDING' in db_lb.provisioning_status:
|
||||
expected_listener_count = len(db_lb.get('listeners', {}))
|
||||
if 'PENDING' in db_lb['provisioning_status']:
|
||||
ignore_listener_count = True
|
||||
else:
|
||||
|
||||
# If this is a heartbeat older than versioning, handle
|
||||
# UDP special for backward compatibility.
|
||||
if 'ver' not in health:
|
||||
expected_listener_count = (
|
||||
self._update_listener_count_for_UDP(
|
||||
session, db_lb['id'], expected_listener_count))
|
||||
else:
|
||||
# If this is not a spare amp, log and skip it.
|
||||
amp = self.amphora_repo.get(session, id=health['id'])
|
||||
|
@ -160,6 +182,8 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
return
|
||||
expected_listener_count = 0
|
||||
|
||||
listeners = health['listeners']
|
||||
|
||||
# Do not update amphora health if the reporting listener count
|
||||
# does not match the expected listener count
|
||||
if len(listeners) == expected_listener_count or ignore_listener_count:
|
||||
|
@ -169,6 +193,9 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
# if we're running too far behind, warn and bail
|
||||
proc_delay = time.time() - health['recv_time']
|
||||
hb_interval = CONF.health_manager.heartbeat_interval
|
||||
# TODO(johnsom) We need to set a warning threshold here, and
|
||||
# escalate to critical when it reaches the
|
||||
# heartbeat_interval
|
||||
if proc_delay >= hb_interval:
|
||||
LOG.warning('Amphora %(id)s health message was processed too '
|
||||
'slowly: %(delay)ss! The system may be overloaded '
|
||||
|
@ -198,16 +225,17 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
return
|
||||
|
||||
processed_pools = []
|
||||
potential_offline_pools = {}
|
||||
|
||||
# We got a heartbeat so lb is healthy until proven otherwise
|
||||
if db_lb.enabled is False:
|
||||
if db_lb['enabled'] is False:
|
||||
lb_status = constants.OFFLINE
|
||||
else:
|
||||
lb_status = constants.ONLINE
|
||||
|
||||
for db_listener in db_lb.listeners:
|
||||
for listener_id in db_lb.get('listeners', {}):
|
||||
db_op_status = db_lb['listeners'][listener_id]['operating_status']
|
||||
listener_status = None
|
||||
listener_id = db_listener.id
|
||||
listener = None
|
||||
|
||||
if listener_id not in listeners:
|
||||
|
@ -230,13 +258,11 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
'status': listener.get('status')})
|
||||
|
||||
try:
|
||||
if listener_status is not None:
|
||||
if (listener_status is not None and
|
||||
listener_status != db_op_status):
|
||||
self._update_status_and_emit_event(
|
||||
session, self.listener_repo, constants.LISTENER,
|
||||
listener_id, listener_status,
|
||||
db_listener.operating_status,
|
||||
db_listener.provisioning_status
|
||||
)
|
||||
listener_id, listener_status, db_op_status)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Listener %s is not in DB", listener_id)
|
||||
|
||||
|
@ -245,38 +271,52 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
|
||||
pools = listener['pools']
|
||||
|
||||
# Process pools bound to listeners
|
||||
for db_pool in db_listener.pools:
|
||||
for db_pool_id in db_lb.get('pools', {}):
|
||||
# If we saw this pool already on another listener
|
||||
# skip it.
|
||||
if db_pool_id in processed_pools:
|
||||
continue
|
||||
db_pool_dict = db_lb['pools'][db_pool_id]
|
||||
lb_status = self._process_pool_status(
|
||||
session, db_pool, pools, lb_status, processed_pools)
|
||||
session, db_pool_id, db_pool_dict, pools,
|
||||
lb_status, processed_pools, potential_offline_pools)
|
||||
|
||||
# Process pools bound to the load balancer
|
||||
for db_pool in db_lb.pools:
|
||||
# Don't re-process pools shared with listeners
|
||||
if db_pool.id in processed_pools:
|
||||
for pool_id in potential_offline_pools:
|
||||
# Skip if we eventually found a status for this pool
|
||||
if pool_id in processed_pools:
|
||||
continue
|
||||
lb_status = self._process_pool_status(
|
||||
session, db_pool, [], lb_status, processed_pools)
|
||||
try:
|
||||
# If the database doesn't already show the pool offline, update
|
||||
if potential_offline_pools[pool_id] != constants.OFFLINE:
|
||||
self._update_status_and_emit_event(
|
||||
session, self.pool_repo, constants.POOL,
|
||||
pool_id, constants.OFFLINE,
|
||||
potential_offline_pools[pool_id])
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Pool %s is not in DB", pool_id)
|
||||
|
||||
# Update the load balancer status last
|
||||
try:
|
||||
self._update_status_and_emit_event(
|
||||
session, self.loadbalancer_repo,
|
||||
constants.LOADBALANCER, db_lb.id, lb_status,
|
||||
db_lb.operating_status, db_lb.provisioning_status
|
||||
)
|
||||
if lb_status != db_lb['operating_status']:
|
||||
self._update_status_and_emit_event(
|
||||
session, self.loadbalancer_repo,
|
||||
constants.LOADBALANCER, db_lb['id'], lb_status,
|
||||
db_lb[constants.OPERATING_STATUS])
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Load balancer %s is not in DB", db_lb.id)
|
||||
LOG.debug('Health Update finished in: {0} seconds'.format(
|
||||
timeit.default_timer() - start_time))
|
||||
|
||||
def _process_pool_status(self, session, db_pool, pools, lb_status,
|
||||
processed_pools):
|
||||
def _process_pool_status(
|
||||
self, session, pool_id, db_pool_dict, pools, lb_status,
|
||||
processed_pools, potential_offline_pools):
|
||||
pool_status = None
|
||||
pool_id = db_pool.id
|
||||
|
||||
if pool_id not in pools:
|
||||
pool_status = constants.OFFLINE
|
||||
# If we don't have a status update for this pool_id
|
||||
# add it to the list of potential offline pools and continue.
|
||||
# We will check the potential offline pool list after we
|
||||
# finish processing the status updates from all of the listeners.
|
||||
potential_offline_pools[pool_id] = db_pool_dict['operating_status']
|
||||
return lb_status
|
||||
else:
|
||||
pool = pools[pool_id]
|
||||
|
||||
|
@ -298,10 +338,10 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
# Deal with the members that are reporting from
|
||||
# the Amphora
|
||||
members = pool['members']
|
||||
for db_member in db_pool.members:
|
||||
for member_id in db_pool_dict.get('members', {}):
|
||||
member_status = None
|
||||
member_db_status = db_member.operating_status
|
||||
member_id = db_member.id
|
||||
member_db_status = (
|
||||
db_pool_dict['members'][member_id]['operating_status'])
|
||||
|
||||
if member_id not in members:
|
||||
if member_db_status != constants.NO_MONITOR:
|
||||
|
@ -334,25 +374,21 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
'status': status})
|
||||
|
||||
try:
|
||||
if member_status is not None:
|
||||
if (member_status is not None and
|
||||
member_status != member_db_status):
|
||||
self._update_status_and_emit_event(
|
||||
session, self.member_repo,
|
||||
constants.MEMBER,
|
||||
member_id, member_status,
|
||||
db_member.operating_status,
|
||||
db_member.provisioning_status
|
||||
)
|
||||
session, self.member_repo, constants.MEMBER,
|
||||
member_id, member_status, member_db_status)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Member %s is not able to update "
|
||||
"in DB", member_id)
|
||||
|
||||
try:
|
||||
if pool_status is not None:
|
||||
if (pool_status is not None and
|
||||
pool_status != db_pool_dict['operating_status']):
|
||||
self._update_status_and_emit_event(
|
||||
session, self.pool_repo, constants.POOL,
|
||||
pool_id, pool_status, db_pool.operating_status,
|
||||
db_pool.provisioning_status
|
||||
)
|
||||
pool_id, pool_status, db_pool_dict['operating_status'])
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Pool %s is not in DB", pool_id)
|
||||
|
||||
|
|
|
@ -1031,6 +1031,78 @@ class AmphoraRepository(BaseRepository):
|
|||
|
||||
return amp.to_data_model()
|
||||
|
||||
def get_lb_for_health_update(self, session, amphora_id):
|
||||
"""This method is for the health manager status update process.
|
||||
|
||||
This is a time sensitive query that occurs often.
|
||||
It is an explicit query as the ORM produces a poorly
|
||||
optimized query.
|
||||
|
||||
Use extreme caution making any changes to this query
|
||||
as it can impact the scalability of the health manager.
|
||||
All changes should be analyzed using SQL "EXPLAIN" to
|
||||
make sure only indexes are being used.
|
||||
Changes should also be evaluated using the stressHM tool.
|
||||
|
||||
Note: The returned object is flat and not a graph representation
|
||||
of the load balancer as it is not needed. This is on
|
||||
purpose to optimize the processing time. This is not in
|
||||
the normal data model objects.
|
||||
|
||||
:param session: A Sql Alchemy database session.
|
||||
:param amphora_id: The amphora ID to lookup the load balancer for.
|
||||
:returns: A dictionary containing the required load balancer details.
|
||||
"""
|
||||
rows = session.execute(
|
||||
"SELECT load_balancer.id, load_balancer.enabled, "
|
||||
"load_balancer.provisioning_status AS lb_prov_status, "
|
||||
"load_balancer.operating_status AS lb_op_status, "
|
||||
"listener.id AS list_id, "
|
||||
"listener.operating_status AS list_op_status, "
|
||||
"pool.id AS pool_id, "
|
||||
"pool.operating_status AS pool_op_status, "
|
||||
"member.id AS member_id, "
|
||||
"member.operating_status AS mem_op_status from "
|
||||
"amphora JOIN load_balancer ON "
|
||||
"amphora.load_balancer_id = load_balancer.id LEFT JOIN "
|
||||
"listener ON load_balancer.id = listener.load_balancer_id "
|
||||
"LEFT JOIN pool ON load_balancer.id = pool.load_balancer_id "
|
||||
"LEFT JOIN member ON pool.id = member.pool_id WHERE "
|
||||
"amphora.id = :amp_id AND amphora.status != :deleted AND "
|
||||
"load_balancer.provisioning_status != :deleted;",
|
||||
{'amp_id': amphora_id, 'deleted': consts.DELETED}).fetchall()
|
||||
|
||||
lb = {}
|
||||
listeners = {}
|
||||
pools = {}
|
||||
for row in rows:
|
||||
if not lb:
|
||||
lb['id'] = row['id']
|
||||
lb['enabled'] = row['enabled'] == 1
|
||||
lb['provisioning_status'] = row['lb_prov_status']
|
||||
lb['operating_status'] = row['lb_op_status']
|
||||
if row['list_id'] and row['list_id'] not in listeners:
|
||||
listener = {'operating_status': row['list_op_status']}
|
||||
listeners[row['list_id']] = listener
|
||||
if row['pool_id']:
|
||||
if row['pool_id'] in pools and row['member_id']:
|
||||
member = {'operating_status': row['mem_op_status']}
|
||||
pools[row['pool_id']]['members'][row['member_id']] = member
|
||||
else:
|
||||
pool = {'operating_status': row['pool_op_status'],
|
||||
'members': {}}
|
||||
if row['member_id']:
|
||||
member = {'operating_status': row['mem_op_status']}
|
||||
pool['members'][row['member_id']] = member
|
||||
pools[row['pool_id']] = pool
|
||||
|
||||
if listeners:
|
||||
lb['listeners'] = listeners
|
||||
if pools:
|
||||
lb['pools'] = pools
|
||||
|
||||
return lb
|
||||
|
||||
|
||||
class AmphoraBuildReqRepository(BaseRepository):
|
||||
model_class = models.AmphoraBuildRequest
|
||||
|
|
|
@ -40,6 +40,9 @@ class BaseRepositoryTest(base.OctaviaDBTestBase):
|
|||
FAKE_UUID_2 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_3 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_4 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_5 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_6 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_7 = uuidutils.generate_uuid()
|
||||
FAKE_EXP_AGE = 10
|
||||
|
||||
def setUp(self):
|
||||
|
@ -3110,6 +3113,81 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
|
|||
self.assertEqual(cert_expired_amphora.cert_expiration, expiration)
|
||||
self.assertEqual(cert_expired_amphora.id, amphora2.id)
|
||||
|
||||
def test_get_lb_for_health_update(self):
|
||||
amphora1 = self.create_amphora(self.FAKE_UUID_1)
|
||||
amphora2 = self.create_amphora(self.FAKE_UUID_3)
|
||||
self.amphora_repo.associate(self.session, self.lb.id, amphora1.id)
|
||||
self.amphora_repo.associate(self.session, self.lb.id, amphora2.id)
|
||||
|
||||
lb_ref = {'enabled': True, 'id': self.lb.id,
|
||||
'operating_status': constants.ONLINE,
|
||||
'provisioning_status': constants.ACTIVE}
|
||||
|
||||
# Test with just a load balancer
|
||||
lb = self.amphora_repo.get_lb_for_health_update(self.session,
|
||||
self.FAKE_UUID_1)
|
||||
self.assertEqual(lb_ref, lb)
|
||||
|
||||
pool = self.pool_repo.create(
|
||||
self.session, id=self.FAKE_UUID_4, project_id=self.FAKE_UUID_2,
|
||||
name="pool_test", description="pool_description",
|
||||
protocol=constants.PROTOCOL_HTTP, load_balancer_id=self.lb.id,
|
||||
lb_algorithm=constants.LB_ALGORITHM_ROUND_ROBIN,
|
||||
provisioning_status=constants.ACTIVE,
|
||||
operating_status=constants.ONLINE, enabled=True)
|
||||
|
||||
pool_ref = {pool.id: {'members': {},
|
||||
'operating_status': constants.ONLINE}}
|
||||
lb_ref['pools'] = pool_ref
|
||||
|
||||
# Test with an LB and a pool
|
||||
lb = self.amphora_repo.get_lb_for_health_update(self.session,
|
||||
self.FAKE_UUID_1)
|
||||
self.assertEqual(lb_ref, lb)
|
||||
|
||||
listener = self.listener_repo.create(
|
||||
self.session, id=self.FAKE_UUID_5, project_id=self.FAKE_UUID_2,
|
||||
name="listener_name", description="listener_description",
|
||||
protocol=constants.PROTOCOL_HTTP, protocol_port=80,
|
||||
connection_limit=1, operating_status=constants.ONLINE,
|
||||
load_balancer_id=self.lb.id, provisioning_status=constants.ACTIVE,
|
||||
enabled=True, peer_port=1025, default_pool_id=pool.id)
|
||||
|
||||
listener_ref = {listener.id: {'operating_status': constants.ONLINE}}
|
||||
lb_ref['listeners'] = listener_ref
|
||||
|
||||
# Test with an LB, pool, and listener (no members)
|
||||
lb = self.amphora_repo.get_lb_for_health_update(self.session,
|
||||
self.FAKE_UUID_1)
|
||||
self.assertEqual(lb_ref, lb)
|
||||
|
||||
member1 = self.member_repo.create(self.session, id=self.FAKE_UUID_6,
|
||||
project_id=self.FAKE_UUID_2,
|
||||
pool_id=pool.id,
|
||||
ip_address="192.0.2.1",
|
||||
protocol_port=80, enabled=True,
|
||||
provisioning_status=constants.ACTIVE,
|
||||
operating_status=constants.ONLINE,
|
||||
backup=False)
|
||||
|
||||
member2 = self.member_repo.create(self.session, id=self.FAKE_UUID_7,
|
||||
project_id=self.FAKE_UUID_2,
|
||||
pool_id=pool.id,
|
||||
ip_address="192.0.2.21",
|
||||
protocol_port=80, enabled=True,
|
||||
provisioning_status=constants.ACTIVE,
|
||||
operating_status=constants.OFFLINE,
|
||||
backup=False)
|
||||
|
||||
member_ref = {member1.id: {'operating_status': constants.ONLINE},
|
||||
member2.id: {'operating_status': constants.OFFLINE}}
|
||||
lb_ref['pools'][pool.id]['members'] = member_ref
|
||||
|
||||
# Test with an LB, pool, listener, and members
|
||||
lb = self.amphora_repo.get_lb_for_health_update(self.session,
|
||||
self.FAKE_UUID_1)
|
||||
self.assertEqual(lb_ref, lb)
|
||||
|
||||
|
||||
class AmphoraHealthRepositoryTest(BaseRepositoryTest):
|
||||
def setUp(self):
|
||||
|
|
|
@ -44,9 +44,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
def setUp(self):
|
||||
super(TestUpdateHealthDb, self).setUp()
|
||||
|
||||
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
|
||||
session_patch = mock.patch('octavia.db.api.get_session')
|
||||
self.addCleanup(session_patch.stop)
|
||||
|
@ -65,8 +65,6 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.pool_repo = mock.MagicMock()
|
||||
|
||||
self.hm.amphora_repo = self.amphora_repo
|
||||
fake_lb = mock.MagicMock()
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = fake_lb
|
||||
self.hm.amphora_health_repo = self.amphora_health_repo
|
||||
self.hm.listener_repo = self.listener_repo
|
||||
self.hm.listener_repo.count.return_value = 1
|
||||
|
@ -104,6 +102,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
mock_lb.pools = [mock_pool1]
|
||||
if mock_listener1:
|
||||
mock_listener1.pools = [mock_pool1]
|
||||
mock_listener1.default_pool = mock_pool1
|
||||
for i in range(members):
|
||||
mock_member_x = mock.Mock()
|
||||
mock_member_x.id = 'member-id-%s' % (i + 1)
|
||||
|
@ -116,13 +115,45 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
|
||||
return mock_lb, mock_listener1, mock_pool1, mock_members
|
||||
|
||||
def _make_fake_lb_health_dict(self, listener=True, pool=True,
|
||||
health_monitor=True, members=1,
|
||||
lb_prov_status=constants.ACTIVE):
|
||||
|
||||
lb_ref = {'enabled': True, 'id': self.FAKE_UUID_1,
|
||||
constants.OPERATING_STATUS: 'bogus',
|
||||
constants.PROVISIONING_STATUS: lb_prov_status}
|
||||
|
||||
if pool:
|
||||
members_dict = {}
|
||||
if health_monitor:
|
||||
member_operating_status = 'NOTHING_MATCHABLE'
|
||||
else:
|
||||
member_operating_status = constants.NO_MONITOR
|
||||
|
||||
for i in range(members):
|
||||
member_id = 'member-id-%s' % (i + 1)
|
||||
members_dict[member_id] = {
|
||||
constants.OPERATING_STATUS: member_operating_status}
|
||||
|
||||
pool_ref = {'pool-id-1': {'members': members_dict,
|
||||
constants.OPERATING_STATUS: 'bogus'}}
|
||||
lb_ref['pools'] = pool_ref
|
||||
|
||||
if listener:
|
||||
listener_ref = {'listener-id-1': {
|
||||
constants.OPERATING_STATUS: 'bogus'}}
|
||||
lb_ref['listeners'] = listener_ref
|
||||
|
||||
return lb_ref
|
||||
|
||||
def test_update_health_event_stream(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {"status": constants.OPEN, "pools": {
|
||||
"pool-id-1": {"status": constants.UP,
|
||||
"members": {"member-id-1": constants.UP}
|
||||
"members": {"member-id-1": constants.UP,
|
||||
"member-id-2": constants.UP}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -130,9 +161,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.event_client.cast.assert_any_call(
|
||||
|
@ -156,12 +186,11 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -173,13 +202,12 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=True, pool=False,
|
||||
lb_prov_status=constants.PENDING_UPDATE))
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(
|
||||
listener=True, pool=False, lb_prov_status=constants.PENDING_UPDATE)
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -191,12 +219,11 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=True, pool=False))
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=True, pool=False)
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertFalse(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -208,12 +235,12 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time() - hb_interval - 1 # extra -1 for buffer
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
|
||||
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
# Receive time is stale, so we shouldn't see this called
|
||||
self.assertFalse(self.loadbalancer_repo.update.called)
|
||||
|
||||
|
@ -234,10 +261,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
|
||||
self.session_mock.commit.side_effect = TestException('boom')
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
self._make_mock_lb_tree())
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -258,9 +284,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -285,10 +310,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
operating_status=constants.ONLINE)
|
||||
|
||||
# If the listener count is wrong, make sure we don't update
|
||||
mock_listener2 = mock.Mock()
|
||||
mock_listener2.id = 'listener-id-2'
|
||||
mock_listener2.pools = [mock_pool1]
|
||||
mock_lb.listeners = [mock_listener1, mock_listener2]
|
||||
lb_ref['listeners']['listener-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_health_repo.replace.reset_mock()
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
@ -304,10 +328,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -319,7 +342,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.session_mock, listener_id,
|
||||
operating_status=constants.ONLINE)
|
||||
self.pool_repo.update.assert_any_call(
|
||||
self.session_mock, mock_pool1.id,
|
||||
self.session_mock, 'pool-id-1',
|
||||
operating_status=constants.OFFLINE
|
||||
)
|
||||
|
||||
|
@ -339,22 +362,16 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
mock_member2 = mock.Mock()
|
||||
mock_member2.id = 'member-id-2'
|
||||
mock_pool2 = mock.Mock()
|
||||
mock_pool2.id = "pool-id-2"
|
||||
mock_pool2.members = [mock_member2]
|
||||
mock_listener2 = mock.Mock()
|
||||
mock_listener2.id = 'listener-id-2'
|
||||
mock_listener2.pools = [mock_pool2]
|
||||
lb_ref['pools']['pool-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus',
|
||||
'members': {'member-id-2': {constants.OPERATING_STATUS: 'bogus'}}}
|
||||
|
||||
mock_lb.listeners.append(mock_listener2)
|
||||
mock_lb.pools.append(mock_pool2)
|
||||
lb_ref['listeners']['listener-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -368,11 +385,11 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
# Call count should be exactly 2, as each pool should be processed once
|
||||
self.assertEqual(2, self.pool_repo.update.call_count)
|
||||
self.pool_repo.update.assert_has_calls([
|
||||
mock.call(self.session_mock, mock_pool1.id,
|
||||
mock.call(self.session_mock, 'pool-id-1',
|
||||
operating_status=constants.ERROR),
|
||||
mock.call(self.session_mock, mock_pool2.id,
|
||||
mock.call(self.session_mock, 'pool-id-2',
|
||||
operating_status=constants.ONLINE)
|
||||
])
|
||||
], any_order=True)
|
||||
|
||||
def test_update_lb_and_list_pool_health_online(self):
|
||||
|
||||
|
@ -389,9 +406,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -431,9 +447,12 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
lb_ref['pools']['pool-id-2'] = {
|
||||
constants.OPERATING_STATUS: constants.OFFLINE}
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -462,9 +481,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"members": {"member-id-1": constants.DRAIN}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -502,9 +520,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"members": {"member-id-1": constants.MAINT}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -542,9 +559,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"members": {"member-id-1": "blah"}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -578,9 +594,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -621,9 +636,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(health_monitor=False))
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(health_monitor=False)
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -659,11 +673,11 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(health_monitor=False))
|
||||
mock_members[0].admin_state_up = False
|
||||
mock_members[0].operating_status = constants.NO_MONITOR
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(health_monitor=False)
|
||||
member1 = lb_ref['pools']['pool-id-1']['members']['member-id-1']
|
||||
member1[constants.OPERATING_STATUS] = constants.NO_MONITOR
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -683,7 +697,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
operating_status=constants.ONLINE)
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
self.session_mock, mock_members[0].id,
|
||||
self.session_mock, 'member-id-1',
|
||||
operating_status=constants.OFFLINE)
|
||||
|
||||
def test_update_health_member_no_check(self):
|
||||
|
@ -702,9 +716,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -743,11 +756,12 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"member-id-1": constants.UP}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(members=2))
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(members=2)
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
|
||||
# test listener, member
|
||||
for listener_id, listener in six.iteritems(
|
||||
|
@ -770,7 +784,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.session_mock, member_id,
|
||||
operating_status=constants.ONLINE)
|
||||
self.member_repo.update.assert_any_call(
|
||||
self.session_mock, mock_members[1].id,
|
||||
self.session_mock, 'member-id-2',
|
||||
operating_status=constants.OFFLINE)
|
||||
|
||||
def test_update_health_list_full_member_down(self):
|
||||
|
@ -788,9 +802,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -815,10 +828,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.session_mock, member_id,
|
||||
operating_status=constants.ERROR)
|
||||
|
||||
mock_listener2 = mock.Mock()
|
||||
mock_listener2.id = 'listener-id-2'
|
||||
mock_listener2.pools = [mock_pool1]
|
||||
mock_lb.listeners.append(mock_listener2)
|
||||
lb_ref['listeners']['listener-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_health_repo.replace.reset_mock()
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
@ -839,9 +851,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -914,25 +925,24 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
# Build our own custom listeners/pools/members
|
||||
for i in [1, 2, 3, 4, 5]:
|
||||
mock_member = mock.Mock()
|
||||
mock_member.id = 'member-id-%s' % i
|
||||
mock_pool = mock.Mock()
|
||||
mock_pool.id = 'pool-id-%s' % i
|
||||
mock_pool.members = [mock_member]
|
||||
if i == 3:
|
||||
mock_member = mock.Mock()
|
||||
mock_member.id = 'member-id-31'
|
||||
mock_pool.members.append(mock_member)
|
||||
mock_listener = mock.Mock()
|
||||
mock_listener.id = 'listener-id-%s' % i
|
||||
mock_listener.pools = [mock_pool]
|
||||
mock_lb.listeners.append(mock_listener)
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref['listeners']['listener-id-%s' % i] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
if i == 3:
|
||||
members_dict = {'member-id-3': {
|
||||
constants.OPERATING_STATUS: 'bogus'}, 'member-id-31': {
|
||||
constants.OPERATING_STATUS: 'bogus'}}
|
||||
else:
|
||||
members_dict = {'member-id-%s' % i: {
|
||||
constants.OPERATING_STATUS: 'bogus'}}
|
||||
lb_ref['pools']['pool-id-%s' % i] = {
|
||||
'members': members_dict, constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
||||
|
@ -977,13 +987,15 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.hm.member_repo.update.side_effect = (
|
||||
[sqlalchemy.orm.exc.NoResultFound])
|
||||
self.hm.pool_repo.update.side_effect = (
|
||||
[sqlalchemy.orm.exc.NoResultFound])
|
||||
sqlalchemy.orm.exc.NoResultFound)
|
||||
self.hm.loadbalancer_repo.update.side_effect = (
|
||||
[sqlalchemy.orm.exc.NoResultFound])
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
lb_ref['pools']['pool-id-2'] = {constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -1026,15 +1038,18 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
# Start everything ONLINE
|
||||
mock_members[0].operating_status = constants.ONLINE
|
||||
mock_pool1.operating_status = constants.ONLINE
|
||||
mock_listener1.operating_status = constants.ONLINE
|
||||
mock_lb.operating_status = constants.ONLINE
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref[constants.OPERATING_STATUS] = constants.ONLINE
|
||||
listener1 = lb_ref['listeners']['listener-id-1']
|
||||
listener1[constants.OPERATING_STATUS] = constants.ONLINE
|
||||
pool1 = lb_ref['pools']['pool-id-1']
|
||||
pool1[constants.OPERATING_STATUS] = constants.ONLINE
|
||||
member1 = pool1['members']['member-id-1']
|
||||
member1[constants.OPERATING_STATUS] = constants.ONLINE
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.event_client.cast.assert_not_called()
|
||||
|
@ -1049,34 +1064,32 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"listeners": {},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
mock_lb.enabled = False
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
|
||||
lb_ref['enabled'] = False
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.loadbalancer_repo.update.assert_called_with(
|
||||
self.mock_session(), mock_lb.id,
|
||||
self.mock_session(), self.FAKE_UUID_1,
|
||||
operating_status='OFFLINE')
|
||||
|
||||
def test_update_health_lb_admin_up(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {},
|
||||
"recv_time": time.time()}
|
||||
"recv_time": time.time(),
|
||||
"ver": 1}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
mock_lb.enabled = True
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.loadbalancer_repo.update.assert_called_with(
|
||||
self.mock_session(), mock_lb.id,
|
||||
self.mock_session(), self.FAKE_UUID_1,
|
||||
operating_status='ONLINE')
|
||||
|
||||
def test_update_health_forbid_to_stale_udp_listener_amphora(self):
|
||||
|
@ -1124,21 +1137,120 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
mock_listener3])
|
||||
mock_lb.pools.extend([mock_pool1, mock_pool2])
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
self.loadbalancer_repo.get.return_value = mock_lb
|
||||
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
lb_ref['listeners']['listener-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
lb_ref['listeners']['listener-id-3'] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
def test_update_health_no_db_lb(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = {}
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertFalse(self.amphora_health_repo.replace.called)
|
||||
|
||||
# Test missing amp in addition to missing lb DB record
|
||||
self.amphora_repo.get_lb_for_health_update.reset_mock()
|
||||
self.amphora_health_repo.replace.reset_mock()
|
||||
|
||||
mock_amphora = mock.MagicMock()
|
||||
mock_amphora.load_balancer_id = None
|
||||
self.amphora_repo.get.return_value = mock_amphora
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.amphora_repo.get.called)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
def test_update_listener_count_for_UDP(self):
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
|
||||
mock_listener1.protocol = constants.PROTOCOL_TCP
|
||||
|
||||
self.hm.loadbalancer_repo.get.return_value = mock_lb
|
||||
|
||||
# Test only TCP listeners
|
||||
result = self.hm._update_listener_count_for_UDP('bogus_session', 1, 0)
|
||||
self.assertEqual(0, result)
|
||||
|
||||
# Test with a valid member
|
||||
mock_listener1.protocol = constants.PROTOCOL_UDP
|
||||
|
||||
result = self.hm._update_listener_count_for_UDP('bogus_session', 1, 1)
|
||||
self.assertEqual(1, result)
|
||||
|
||||
# Test with a disabled member
|
||||
mock_listener1.protocol = constants.PROTOCOL_UDP
|
||||
mock_members[0].enabled = False
|
||||
|
||||
result = self.hm._update_listener_count_for_UDP('bogus_session', 1, 1)
|
||||
self.assertEqual(0, result)
|
||||
|
||||
def test_update_status_and_emit_event(self):
|
||||
|
||||
# Test update with the same operating status
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver=constants.NOOP_EVENT_STREAMER)
|
||||
self.hm._update_status_and_emit_event(
|
||||
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
|
||||
1, 'ONLINE', 'ONLINE')
|
||||
self.assertFalse(self.loadbalancer_repo.update.called)
|
||||
self.assertFalse(self.event_client.cast.called)
|
||||
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer',
|
||||
sync_provisioning_status=True)
|
||||
|
||||
self.loadbalancer_repo.update.reset_mock()
|
||||
self.event_client.reset_mock()
|
||||
|
||||
# Test stream with provisioning sync
|
||||
self.hm._update_status_and_emit_event(
|
||||
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
|
||||
1, 'ONLINE', 'OFFLINE')
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertTrue(self.event_client.cast.called)
|
||||
|
||||
self.conf.config(group="health_manager",
|
||||
sync_provisioning_status=False)
|
||||
|
||||
self.loadbalancer_repo.update.reset_mock()
|
||||
self.event_client.reset_mock()
|
||||
|
||||
# Test stream with no provisioning sync
|
||||
self.hm._update_status_and_emit_event(
|
||||
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
|
||||
1, 'ONLINE', 'ONLINE')
|
||||
self.assertFalse(self.loadbalancer_repo.update.called)
|
||||
self.assertFalse(self.event_client.cast.called)
|
||||
|
||||
|
||||
class TestUpdateStatsDb(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateStatsDb, self).setUp()
|
||||
|
||||
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
|
||||
self.sm = update_db.UpdateStatsDb()
|
||||
self.event_client = mock.MagicMock()
|
||||
|
@ -1181,7 +1293,7 @@ class TestUpdateStatsDb(base.TestCase):
|
|||
self.loadbalancer_repo.get.return_value = self.loadbalancer
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_stats(self, session):
|
||||
def test_update_stats(self, mock_session):
|
||||
|
||||
health = {
|
||||
"id": self.amphora_id,
|
||||
|
@ -1205,7 +1317,7 @@ class TestUpdateStatsDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
|
||||
session.return_value = 'blah'
|
||||
mock_session.return_value = 'blah'
|
||||
|
||||
self.sm.update_stats(health, '192.0.2.1')
|
||||
|
||||
|
@ -1242,3 +1354,39 @@ class TestUpdateStatsDb(base.TestCase):
|
|||
self.listener_stats.active_connections,
|
||||
'bytes_out': self.listener_stats.bytes_out,
|
||||
'request_errors': self.listener_stats.request_errors}})
|
||||
|
||||
# Test with noop streamer
|
||||
self.event_client.cast.reset_mock()
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver=constants.NOOP_EVENT_STREAMER)
|
||||
|
||||
self.sm.update_stats(health, '192.0.2.1')
|
||||
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
self.assertFalse(self.event_client.cast.called)
|
||||
|
||||
# Test with missing DB listener
|
||||
self.event_client.cast.reset_mock()
|
||||
self.sm.repo_listener.get.return_value = None
|
||||
|
||||
self.sm.update_stats(health, '192.0.2.1')
|
||||
|
||||
self.event_client.cast.assert_called_once_with(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'listener_stats',
|
||||
'info_id': self.listener_id,
|
||||
'info_payload': {
|
||||
'bytes_in': self.listener_stats.bytes_in,
|
||||
'total_connections':
|
||||
self.listener_stats.total_connections,
|
||||
'active_connections':
|
||||
self.listener_stats.active_connections,
|
||||
'bytes_out': self.listener_stats.bytes_out,
|
||||
'request_errors': self.listener_stats.request_errors}})
|
||||
|
||||
# Test with update failure
|
||||
self.event_client.cast.reset_mock()
|
||||
mock_session.side_effect = Exception
|
||||
self.sm.update_stats(health, '192.0.2.1')
|
||||
self.assertFalse(self.event_client.cast.called)
|
||||
|
|
Loading…
Reference in New Issue