Implementing EventStreamer
EvenStream will be used to serialize messages from the octavia database to neutron-lbaas database via oslo_messaging. Also renaming update mixin class since its not really a mixin. The health manager will make changes to the octavia database when members etc are marked as down and up etc which would result in databases that were not in sync between neutron-lbaas and octavia. A mechanism to communicate database changes from octavia back to neutron is required so this CR attempts to use a oslo_messaging system to communicate those changes, Docimpact - /etc/octavia.conf the user can set the option event_streamer_driver = neutron_event_streamer to setup a queue to connect to neutron-lbaas. if this option is left blank it will default to the noop_event_streamer which will do nothing effectively turning the Queue off. Co-Authored-By: Brandon Logan <brandon.logan@rackspace.com> Change-Id: I77a049dcc21e3ee6287e661e82365ab7b9c44562
This commit is contained in:
parent
0939f8224f
commit
c84021ac27
|
@ -46,6 +46,13 @@
|
|||
# health_check_interval = 3
|
||||
# sock_rlimit = 0
|
||||
|
||||
# EventStreamer options are
|
||||
# queue_event_streamer,
|
||||
# noop_event_streamer
|
||||
# event_streamer_driver = noop_event_streamer
|
||||
|
||||
|
||||
|
||||
[keystone_authtoken]
|
||||
# auth_uri = https://localhost:5000/v3
|
||||
# admin_user = octavia
|
||||
|
@ -175,6 +182,9 @@
|
|||
# Topic (i.e. Queue) Name
|
||||
# topic = octavia_prov
|
||||
|
||||
# Topic for octavia's events sent to a queue
|
||||
# event_stream_topic = neutron_lbaas_event
|
||||
|
||||
[house_keeping]
|
||||
# Interval in seconds to initiate spare amphora checks
|
||||
# spare_check_interval = 30
|
||||
|
|
|
@ -19,7 +19,7 @@ from octavia.amphorae.drivers import driver_base as driver_base
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoggingMixIn(driver_base.HealthMixin, driver_base.StatsMixin):
|
||||
class LoggingUpdate(object):
|
||||
def update_stats(self, stats):
|
||||
LOG.debug("Amphora %s no-op, update stats %s",
|
||||
self.__class__.__name__, stats)
|
||||
|
|
|
@ -22,8 +22,7 @@ from oslo_reports import guru_meditation_report as gmr
|
|||
from octavia.amphorae.drivers.health import heartbeat_udp
|
||||
from octavia.common import service
|
||||
from octavia.controller.healthmanager import health_manager
|
||||
from octavia.controller.healthmanager import update_health_mixin
|
||||
from octavia.controller.healthmanager import update_stats_mixin
|
||||
from octavia.controller.healthmanager import update_db
|
||||
from octavia.i18n import _LI
|
||||
from octavia import version
|
||||
|
||||
|
@ -36,8 +35,8 @@ CONF.import_group('health_manager', 'octavia.common.config')
|
|||
def hm_listener():
|
||||
# TODO(german): steved'or load those drivers
|
||||
udp_getter = heartbeat_udp.UDPStatusGetter(
|
||||
update_health_mixin.UpdateHealthMixin(),
|
||||
update_stats_mixin.UpdateStatsMixin())
|
||||
update_db.UpdateHealthDb(),
|
||||
update_db.UpdateStatsDb())
|
||||
while True:
|
||||
udp_getter.check()
|
||||
|
||||
|
|
|
@ -143,11 +143,20 @@ healthmanager_opts = [
|
|||
default=[]),
|
||||
cfg.IntOpt('heartbeat_interval',
|
||||
default=10,
|
||||
help=_('Sleep time between sending hearthbeats.'))
|
||||
]
|
||||
help=_('Sleep time between sending hearthbeats.')),
|
||||
cfg.StrOpt('event_streamer_driver',
|
||||
help=_('Specifies which driver to use for the event_streamer '
|
||||
'for syncing the octavia and neutron_lbaas dbs. If you '
|
||||
'don\'t need to sync the database or are running '
|
||||
'octavia in stand alone mode use the '
|
||||
'noop_event_streamer'),
|
||||
default='noop_event_streamer')]
|
||||
|
||||
oslo_messaging_opts = [
|
||||
cfg.StrOpt('topic'),
|
||||
cfg.StrOpt('event_stream_topic',
|
||||
default='neutron_lbaas_event',
|
||||
help=_('topic name for communicating events through a queue')),
|
||||
]
|
||||
|
||||
keystone_authtoken_v3_opts = [
|
||||
|
|
|
@ -31,6 +31,9 @@ HEALTH_MONITOR_HTTPS = 'HTTPS'
|
|||
SUPPORTED_HEALTH_MONITOR_TYPES = (HEALTH_MONITOR_HTTP, HEALTH_MONITOR_HTTPS,
|
||||
HEALTH_MONITOR_PING, HEALTH_MONITOR_TCP)
|
||||
|
||||
UPDATE_STATS = 'UPDATE_STATS'
|
||||
UPDATE_HEALTH = 'UPDATE_HEALTH'
|
||||
|
||||
PROTOCOL_TCP = 'TCP'
|
||||
PROTOCOL_HTTP = 'HTTP'
|
||||
PROTOCOL_HTTPS = 'HTTPS'
|
||||
|
@ -67,6 +70,7 @@ OFFLINE = 'OFFLINE'
|
|||
DEGRADED = 'DEGRADED'
|
||||
ERROR = 'ERROR'
|
||||
NO_MONITOR = 'NO_MONITOR'
|
||||
OPERATING_STATUS = 'operating_status'
|
||||
SUPPORTED_OPERATING_STATUSES = (ONLINE, OFFLINE, DEGRADED, ERROR, NO_MONITOR)
|
||||
|
||||
AMPHORA_VM = 'VM'
|
||||
|
|
|
@ -14,25 +14,31 @@
|
|||
|
||||
import datetime
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
import sqlalchemy
|
||||
from stevedore import driver as stevedore_driver
|
||||
|
||||
from octavia.amphorae.drivers import driver_base as driver_base
|
||||
from octavia.common import constants
|
||||
from octavia.controller.healthmanager import update_serializer
|
||||
from octavia.controller.queue import event_queue
|
||||
from octavia.db import api as db_api
|
||||
from octavia.db import repositories as repo
|
||||
from octavia.i18n import _LE, _LW
|
||||
|
||||
import six
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UpdateHealthMixin(driver_base.HealthMixin):
|
||||
|
||||
class UpdateHealthDb(object):
|
||||
def __init__(self):
|
||||
super(UpdateHealthMixin, self).__init__()
|
||||
super(UpdateHealthDb, self).__init__()
|
||||
# first setup repo for amphora, listener,member(nodes),pool repo
|
||||
self.event_streamer = stevedore_driver.DriverManager(
|
||||
namespace='octavia.controller.queues',
|
||||
name=cfg.CONF.health_manager.event_streamer_driver,
|
||||
invoke_on_load=True).driver
|
||||
self.amphora_repo = repo.AmphoraRepository()
|
||||
self.amphora_health_repo = repo.AmphoraHealthRepository()
|
||||
self.listener_repo = repo.ListenerRepository()
|
||||
|
@ -40,6 +46,23 @@ class UpdateHealthMixin(driver_base.HealthMixin):
|
|||
self.member_repo = repo.MemberRepository()
|
||||
self.pool_repo = repo.PoolRepository()
|
||||
|
||||
def emit(self, info_type, info_id, info_obj):
|
||||
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
||||
self.event_streamer.emit(cnt)
|
||||
|
||||
def _update_status_and_emit_event(self, session, repo, entity_type,
|
||||
entity_id, new_op_status):
|
||||
entity = repo.get(session, id=entity_id)
|
||||
if entity.operating_status.lower() != new_op_status.lower():
|
||||
LOG.debug("%s %s status has changed from %s to "
|
||||
"%s. Updating db and sending event.",
|
||||
entity_type, entity_id, entity.operating_status,
|
||||
new_op_status)
|
||||
repo.update(session, entity_id, operating_status=new_op_status)
|
||||
self.emit(
|
||||
entity_type, entity_id,
|
||||
{constants.OPERATING_STATUS: new_op_status})
|
||||
|
||||
def update_health(self, health):
|
||||
"""This function is to update db info based on amphora status
|
||||
|
||||
|
@ -115,9 +138,10 @@ class UpdateHealthMixin(driver_base.HealthMixin):
|
|||
|
||||
try:
|
||||
if listener_status is not None:
|
||||
self.listener_repo.update(
|
||||
session, listener_id,
|
||||
operating_status=listener_status)
|
||||
self._update_status_and_emit_event(
|
||||
session, self.listener_repo, constants.LISTENER,
|
||||
listener_id, listener_status
|
||||
)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error(_LE("Listener %s is not in DB"), listener_id)
|
||||
|
||||
|
@ -158,22 +182,78 @@ class UpdateHealthMixin(driver_base.HealthMixin):
|
|||
|
||||
try:
|
||||
if member_status is not None:
|
||||
self.member_repo.update(session, id=member_id,
|
||||
operating_status=(
|
||||
member_status))
|
||||
self._update_status_and_emit_event(
|
||||
session, self.member_repo, constants.MEMBER,
|
||||
member_id, member_status
|
||||
)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error(_LE("Member %s is not able to update "
|
||||
"in DB"), member_id)
|
||||
|
||||
try:
|
||||
if pool_status is not None:
|
||||
self.pool_repo.update(session, pool_id,
|
||||
operating_status=pool_status)
|
||||
self._update_status_and_emit_event(
|
||||
session, self.pool_repo, constants.POOL,
|
||||
pool_id, pool_status
|
||||
)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error(_LE("Pool %s is not in DB"), pool_id)
|
||||
|
||||
try:
|
||||
self.loadbalancer_repo.update(session, lb_id,
|
||||
operating_status=lb_status)
|
||||
self._update_status_and_emit_event(
|
||||
session, self.loadbalancer_repo,
|
||||
constants.LOADBALANCER, lb_id, lb_status
|
||||
)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error(_LE("Load balancer %s is not in DB"), lb_id)
|
||||
|
||||
|
||||
class UpdateStatsDb(object):
|
||||
|
||||
def __init__(self):
|
||||
super(UpdateStatsDb, self).__init__()
|
||||
self.listener_stats_repo = repo.ListenerStatisticsRepository()
|
||||
self.event_streamer = event_queue.EventStreamerNeutron()
|
||||
|
||||
def emit(self, info_type, info_id, info_obj):
|
||||
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
||||
self.event_streamer.emit(cnt)
|
||||
|
||||
def update_stats(self, health_message):
|
||||
"""This function is to update the db with listener stats
|
||||
|
||||
:param health_message: The health message containing the listener stats
|
||||
:type map: string
|
||||
:returns: null
|
||||
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {"status": constants.OPEN,
|
||||
'stats': {'conns': 0,
|
||||
'totconns': 0,
|
||||
'rx': 0,
|
||||
'tx': 0},
|
||||
"pools": {
|
||||
"pool-id-1": {"status": constants.UP,
|
||||
"members": {"member-id-1": constants.ONLINE}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"""
|
||||
session = db_api.get_session()
|
||||
|
||||
listeners = health_message['listeners']
|
||||
for listener_id, listener in six.iteritems(listeners):
|
||||
|
||||
stats = listener.get('stats')
|
||||
stats = {'bytes_in': stats['rx'], 'bytes_out': stats['tx'],
|
||||
'active_connections': stats['conns'],
|
||||
'total_connections': stats['totconns']}
|
||||
LOG.debug("Updating listener stats in db and sending event.")
|
||||
LOG.debug("Listener %s stats: %s", listener_id, stats)
|
||||
self.listener_stats_repo.replace(session, listener_id, **stats)
|
||||
self.emit('listener_stats', listener_id, stats)
|
|
@ -0,0 +1,47 @@
|
|||
# Copyright 2014 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
|
||||
class InfoContainer(object):
|
||||
@staticmethod
|
||||
def from_dict(dict_obj):
|
||||
return InfoContainer(dict_obj['info_type'],
|
||||
dict_obj['info_id'],
|
||||
dict_obj['info_payload'])
|
||||
|
||||
def __init__(self, info_type, info_id, info_payload):
|
||||
self.info_type = copy.copy(info_type)
|
||||
self.info_id = copy.copy(info_id)
|
||||
self.info_payload = copy.deepcopy(info_payload)
|
||||
|
||||
def to_dict(self):
|
||||
return {'info_type': self.info_type,
|
||||
'info_id': self.info_id,
|
||||
'info_payload': self.info_payload}
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, InfoContainer):
|
||||
return False
|
||||
if self.info_type != other.info_type:
|
||||
return False
|
||||
if self.info_id != other.info_id:
|
||||
return False
|
||||
if self.info_payload != other.info_payload:
|
||||
return False
|
||||
return True
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
|
@ -1,65 +0,0 @@
|
|||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from octavia.amphorae.drivers import driver_base as driver_base
|
||||
from octavia.db import api as db_api
|
||||
from octavia.db import repositories as repo
|
||||
|
||||
import six
|
||||
|
||||
|
||||
class UpdateStatsMixin(driver_base.StatsMixin):
|
||||
|
||||
def __init__(self):
|
||||
super(UpdateStatsMixin, self).__init__()
|
||||
self.listener_stats_repo = repo.ListenerStatisticsRepository()
|
||||
|
||||
def update_stats(self, health_message):
|
||||
"""This function is to update the db with listener stats
|
||||
|
||||
:param health_message: The health message containing the listener stats
|
||||
:type map: string
|
||||
:returns: null
|
||||
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {"status": constants.OPEN,
|
||||
'stats': {'conns': 0,
|
||||
'totconns': 0,
|
||||
'rx': 0,
|
||||
'tx': 0},
|
||||
"pools": {
|
||||
"pool-id-1": {"status": constants.UP,
|
||||
"members": {"member-id-1": constants.ONLINE}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"""
|
||||
session = db_api.get_session()
|
||||
|
||||
listeners = health_message['listeners']
|
||||
for listener_id, listener in six.iteritems(listeners):
|
||||
|
||||
stats = listener.get('stats')
|
||||
|
||||
self.listener_stats_repo.replace(session, listener_id,
|
||||
bytes_in=stats['rx'],
|
||||
bytes_out=stats['tx'],
|
||||
active_connections=stats['conns'],
|
||||
total_connections=(
|
||||
stats['totconns']))
|
|
@ -0,0 +1,71 @@
|
|||
# Copyright 2015 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
import six
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class EventStreamerBase(object):
|
||||
"""Base class for EventStreamer
|
||||
|
||||
A stand in abstract class that defines what methods are stevedore loaded
|
||||
implementations of event streamer is expected to provide.
|
||||
"""
|
||||
@abc.abstractmethod
|
||||
def emit(self, cnt):
|
||||
"""method to send a DB event to neutron-lbaas if it is needed.
|
||||
|
||||
:param cnt: an InfoContainer container object
|
||||
:return: None
|
||||
"""
|
||||
|
||||
|
||||
class EventStreamerNoop(EventStreamerBase):
|
||||
"""Nop class implementation of EventStreamer
|
||||
|
||||
Usefull if your running in standalone mode and don't need to send
|
||||
updates to Neutron Lbaas
|
||||
"""
|
||||
|
||||
def emit(self, cnt):
|
||||
pass
|
||||
|
||||
|
||||
class EventStreamerNeutron(EventStreamerBase):
|
||||
"""Neutron LBaaS
|
||||
|
||||
When you're using Octavia alongside neutron LBaaS this class provides
|
||||
a mechanism to send updates to neutron LBaaS database via
|
||||
oslo_messaging queues.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
topic = cfg.CONF.oslo_messaging.event_stream_topic
|
||||
self.transport = oslo_messaging.get_transport(cfg.CONF)
|
||||
self.target = oslo_messaging.Target(topic=topic, exchange="common",
|
||||
namespace='control', fanout=False,
|
||||
version='1.0')
|
||||
self.client = oslo_messaging.RPCClient(self.transport, self.target)
|
||||
|
||||
def emit(self, cnt):
|
||||
LOG.debug("Emitting data to event streamer %s", cnt.to_dict())
|
||||
self.client.cast({}, 'update_info', container=cnt.to_dict())
|
|
@ -14,19 +14,19 @@
|
|||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from octavia.amphorae.drivers.noop_driver import driver as driver
|
||||
from octavia.amphorae.drivers.noop_driver import driver
|
||||
from octavia.common import data_models
|
||||
from octavia.network import data_models as network_models
|
||||
from octavia.tests.unit import base as base
|
||||
from octavia.tests.unit import base
|
||||
|
||||
|
||||
FAKE_UUID_1 = uuidutils.generate_uuid()
|
||||
|
||||
|
||||
class LoggingMixIn(base.TestCase):
|
||||
class TestLoggingUpdate(base.TestCase):
|
||||
def setUp(self):
|
||||
super(LoggingMixIn, self).setUp()
|
||||
self.mixin = driver.LoggingMixIn()
|
||||
super(TestLoggingUpdate, self).setUp()
|
||||
self.mixin = driver.LoggingUpdate()
|
||||
|
||||
def test_update_stats(self):
|
||||
self.mixin.update_stats('test update stats')
|
||||
|
|
|
@ -24,9 +24,9 @@ class TestHealthManagerCMD(base.TestCase):
|
|||
super(TestHealthManagerCMD, self).setUp()
|
||||
|
||||
@mock.patch('octavia.controller.healthmanager.'
|
||||
'update_stats_mixin.UpdateStatsMixin')
|
||||
'update_db.UpdateStatsDb')
|
||||
@mock.patch('octavia.controller.healthmanager.'
|
||||
'update_health_mixin.UpdateHealthMixin')
|
||||
'update_db.UpdateHealthDb')
|
||||
@mock.patch('octavia.amphorae.drivers.health.'
|
||||
'heartbeat_udp.UDPStatusGetter')
|
||||
def test_hm_listener(self, mock_getter, mock_health, mock_stats):
|
||||
|
|
|
@ -12,23 +12,31 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
import sqlalchemy
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.healthmanager import update_health_mixin as healthmixin
|
||||
import octavia.tests.unit.base as base
|
||||
from octavia.common import data_models
|
||||
from octavia.controller.healthmanager import update_db
|
||||
from octavia.tests.unit import base
|
||||
|
||||
|
||||
class TestUpdateHealthMixin(base.TestCase):
|
||||
class TestUpdateHealthDb(base.TestCase):
|
||||
FAKE_UUID_1 = uuidutils.generate_uuid()
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateHealthMixin, self).setUp()
|
||||
self.hm = healthmixin.UpdateHealthMixin()
|
||||
|
||||
super(TestUpdateHealthDb, self).setUp()
|
||||
cfg.CONF.set_override(group='health_manager',
|
||||
name='event_streamer_driver',
|
||||
override='queue_event_streamer')
|
||||
self.hm = update_db.UpdateHealthDb()
|
||||
self.event_client = mock.MagicMock()
|
||||
self.hm.event_streamer.client = self.event_client
|
||||
self.amphora_repo = mock.MagicMock()
|
||||
self.amphora_health_repo = mock.MagicMock()
|
||||
self.listener_repo = mock.MagicMock()
|
||||
|
@ -46,6 +54,33 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
self.hm.member_repo = self.member_repo
|
||||
self.hm.pool_repo = self.pool_repo
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_health_event_stream(self, session):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {"status": constants.OPEN, "pools": {
|
||||
"pool-id-1": {"status": constants.UP,
|
||||
"members": {"member-id-1": constants.UP}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.hm.update_health(health)
|
||||
self.event_client.cast.assert_any_call(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'listener', 'info_id': 'listener-id-1',
|
||||
'info_payload': {'operating_status': 'ONLINE'}})
|
||||
self.event_client.cast.assert_any_call(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'member', 'info_id': 'member-id-1',
|
||||
'info_payload': {'operating_status': 'ONLINE'}})
|
||||
self.event_client.cast.assert_any_call(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'pool', 'info_id': 'pool-id-1',
|
||||
'info_payload': {'operating_status': 'ONLINE'}})
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_health_Online(self, session):
|
||||
|
||||
|
@ -62,7 +97,6 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
}
|
||||
|
||||
session.return_value = 'blah'
|
||||
|
||||
self.hm.update_health(health)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -81,7 +115,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
for member_id, member in six.iteritems(
|
||||
pool.get('members', {})):
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.ONLINE)
|
||||
|
||||
self.hm.listener_repo.count.return_value = 2
|
||||
|
@ -124,7 +158,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.ERROR)
|
||||
|
||||
self.hm.listener_repo.count.return_value = 2
|
||||
|
@ -168,7 +202,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.NO_MONITOR)
|
||||
|
||||
self.hm.listener_repo.count.return_value = 2
|
||||
|
@ -211,7 +245,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.ERROR)
|
||||
|
||||
self.hm.listener_repo.count.return_value = 2
|
||||
|
@ -254,7 +288,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id, operating_status=constants.ERROR)
|
||||
'blah', member_id, operating_status=constants.ERROR)
|
||||
|
||||
# Test the logic code paths
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
|
@ -353,5 +387,100 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.ONLINE)
|
||||
|
||||
def test_update_health_no_status_change(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {
|
||||
"status": constants.OPEN, "pools": {
|
||||
"pool-id-1": {
|
||||
"status": constants.UP, "members": {
|
||||
"member-id-1": constants.UP
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
db_lb = data_models.LoadBalancer(
|
||||
id=self.FAKE_UUID_1, operating_status=constants.ONLINE
|
||||
)
|
||||
db_listener = data_models.Listener(
|
||||
id='listener-id-', operating_status=constants.ONLINE,
|
||||
load_balancer_id=self.FAKE_UUID_1
|
||||
)
|
||||
db_pool = data_models.Pool(
|
||||
id='pool-id-1', operating_status=constants.ONLINE
|
||||
)
|
||||
db_member = data_models.Member(
|
||||
id='member-id-1', operating_status=constants.ONLINE
|
||||
)
|
||||
self.listener_repo.get.return_value = db_listener
|
||||
self.pool_repo.get.return_value = db_pool
|
||||
self.member_repo.get.return_value = db_member
|
||||
self.loadbalancer_repo.get.return_value = db_lb
|
||||
self.hm.update_health(health)
|
||||
self.event_client.cast.assert_not_called()
|
||||
self.loadbalancer_repo.update.assert_not_called()
|
||||
self.listener_repo.update.assert_not_called()
|
||||
self.pool_repo.update.assert_not_called()
|
||||
self.member_repo.update.assert_not_called()
|
||||
|
||||
|
||||
class TestUpdateStatsDb(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateStatsDb, self).setUp()
|
||||
self.sm = update_db.UpdateStatsDb()
|
||||
self.event_client = mock.MagicMock()
|
||||
self.sm.event_streamer.client = self.event_client
|
||||
|
||||
self.listener_stats_repo = mock.MagicMock()
|
||||
self.sm.listener_stats_repo = self.listener_stats_repo
|
||||
|
||||
self.bytes_in = random.randrange(1000000000)
|
||||
self.bytes_out = random.randrange(1000000000)
|
||||
self.active_conns = random.randrange(1000000000)
|
||||
self.total_conns = random.randrange(1000000000)
|
||||
self.loadbalancer_id = uuidutils.generate_uuid()
|
||||
self.listener_id = uuidutils.generate_uuid()
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_stats(self, session):
|
||||
|
||||
health = {
|
||||
"id": self.loadbalancer_id,
|
||||
"listeners": {
|
||||
self.listener_id: {"status": constants.OPEN,
|
||||
"stats": {"conns": self.active_conns,
|
||||
"totconns": self.total_conns,
|
||||
"rx": self.bytes_in,
|
||||
"tx": self.bytes_out},
|
||||
"pools": {"pool-id-1":
|
||||
{"status": constants.UP,
|
||||
"members":
|
||||
{"member-id-1": constants.ONLINE}
|
||||
}
|
||||
}
|
||||
}}}
|
||||
|
||||
session.return_value = 'blah'
|
||||
|
||||
self.sm.update_stats(health)
|
||||
|
||||
self.listener_stats_repo.replace.assert_called_once_with(
|
||||
'blah', self.listener_id, bytes_in=self.bytes_in,
|
||||
bytes_out=self.bytes_out, active_connections=self.active_conns,
|
||||
total_connections=self.total_conns)
|
||||
self.event_client.cast.assert_called_once_with(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'listener_stats',
|
||||
'info_id': self.listener_id,
|
||||
'info_payload': {
|
||||
'bytes_in': self.bytes_in,
|
||||
'total_connections': self.total_conns,
|
||||
'active_connections': self.active_conns,
|
||||
'bytes_out': self.bytes_out}})
|
|
@ -0,0 +1,39 @@
|
|||
# Copyright 2014 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.healthmanager import update_serializer
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
|
||||
class TestUpdateSerializer(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestUpdateSerializer, self).setUp()
|
||||
|
||||
def test_serializer_from_dict_to_dict(self):
|
||||
obj_id = str(uuid.uuid4())
|
||||
obj_type = constants.UPDATE_HEALTH
|
||||
obj_payload = {'test': [1, 2, 3, 4], id: obj_id}
|
||||
obj = update_serializer.InfoContainer(obj_id, obj_type, obj_payload)
|
||||
cloned_obj = update_serializer.InfoContainer.from_dict(obj.to_dict())
|
||||
self.assertEqual(obj, cloned_obj)
|
||||
|
||||
obj_id = str(uuid.uuid4())
|
||||
obj_type = constants.UPDATE_HEALTH
|
||||
obj_payload = {'test': [3, 2, 1, 6], id: obj_id, 'x': {'y': 1}}
|
||||
obj = update_serializer.InfoContainer(obj_id, obj_type, obj_payload)
|
||||
cloned_obj = update_serializer.InfoContainer.from_dict(obj.to_dict())
|
||||
self.assertEqual(obj, cloned_obj)
|
|
@ -1,67 +0,0 @@
|
|||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
|
||||
import mock
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.healthmanager import update_stats_mixin as statsmixin
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
|
||||
class TestUpdateStatsMixin(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateStatsMixin, self).setUp()
|
||||
|
||||
self.sm = statsmixin.UpdateStatsMixin()
|
||||
self.listener_stats_repo = mock.MagicMock()
|
||||
self.sm.listener_stats_repo = self.listener_stats_repo
|
||||
|
||||
self.bytes_in = random.randrange(1000000000)
|
||||
self.bytes_out = random.randrange(1000000000)
|
||||
self.active_conns = random.randrange(1000000000)
|
||||
self.total_conns = random.randrange(1000000000)
|
||||
self.loadbalancer_id = uuidutils.generate_uuid()
|
||||
self.listener_id = uuidutils.generate_uuid()
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_stats(self, session):
|
||||
|
||||
health = {
|
||||
"id": self.loadbalancer_id,
|
||||
"listeners": {
|
||||
self.listener_id: {"status": constants.OPEN,
|
||||
"stats": {"conns": self.active_conns,
|
||||
"totconns": self.total_conns,
|
||||
"rx": self.bytes_in,
|
||||
"tx": self.bytes_out},
|
||||
"pools": {"pool-id-1":
|
||||
{"status": constants.UP,
|
||||
"members":
|
||||
{"member-id-1": constants.ONLINE}
|
||||
}
|
||||
}
|
||||
}}}
|
||||
|
||||
session.return_value = 'blah'
|
||||
|
||||
self.sm.update_stats(health)
|
||||
|
||||
self.listener_stats_repo.replace.assert_called_once_with(
|
||||
'blah', self.listener_id, bytes_in=self.bytes_in,
|
||||
bytes_out=self.bytes_out, active_connections=self.active_conns,
|
||||
total_connections=self.total_conns)
|
|
@ -50,6 +50,9 @@ octavia.amphora.drivers =
|
|||
amphora_noop_driver = octavia.amphorae.drivers.noop_driver.driver:NoopAmphoraLoadBalancerDriver
|
||||
amphora_haproxy_rest_driver = octavia.amphorae.drivers.haproxy.rest_api_driver:HaproxyAmphoraLoadBalancerDriver
|
||||
amphora_haproxy_ssh_driver = octavia.amphorae.drivers.haproxy.ssh_driver:HaproxyManager
|
||||
octavia.controller.queues =
|
||||
noop_event_streamer = octavia.controller.queue.neutron_queue:EventStreamerNoop
|
||||
queue_event_streamer = octavia.controller.queue.event_queue:EventStreamerNeutron
|
||||
octavia.compute.drivers =
|
||||
compute_noop_driver = octavia.compute.drivers.noop_driver.driver:NoopComputeDriver
|
||||
compute_nova_driver = octavia.compute.drivers.nova_driver:VirtualMachineManager
|
||||
|
|
Loading…
Reference in New Issue